wrapyfi.publishers package

Submodules

wrapyfi.publishers.mqtt module

class wrapyfi.publishers.mqtt.MqttPublisher(name: str, out_topic: str, should_wait: bool = True, broker_address: str = 'broker.emqx.io', broker_port: int = 1883, mqtt_kwargs: dict | None = None, **kwargs)[source]

Bases: Publisher

__init__(name: str, out_topic: str, should_wait: bool = True, broker_address: str = 'broker.emqx.io', broker_port: int = 1883, mqtt_kwargs: dict | None = None, **kwargs)[source]

Initialize the publisher.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic (e.g. ‘topic’)

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • broker_address – str: Address of the MQTT broker. Default is ‘broker.emqx.io’

  • broker_port – int: Port of the MQTT broker. Default is 1883

  • mqtt_kwargs – dict: Additional kwargs for the MQTT middleware

  • kwargs – dict: Additional kwargs for the publisher

await_connection(out_topic: str | None = None, repeats: int | None = None)[source]

Wait for the connection to be established.

Parameters:
  • out_topic – str: Name of the output topic

  • repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

close()[source]

Close the publisher.

class wrapyfi.publishers.mqtt.MqttNativeObjectPublisher(name: str, out_topic: str, should_wait: bool = True, multi_threaded: bool = False, serializer_kwargs: dict | None = None, **kwargs)[source]

Bases: MqttPublisher

__init__(name: str, out_topic: str, should_wait: bool = True, multi_threaded: bool = False, serializer_kwargs: dict | None = None, **kwargs)[source]

The NativeObjectPublisher using the MQTT message construct assuming a combination of python native objects and numpy arrays as input. Serializes the data (including plugins) using the encoder and sends it as a string.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic (e.g. ‘topic’)

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • multi_threaded – bool: Whether to use a separate client for each thread. Default is False

  • serializer_kwargs – dict: Additional kwargs for the serializer

  • kwargs – dict: Additional kwargs for the publisher

establish(repeats: int | None = None, **kwargs)[source]

Establish the connection to the publisher.

Parameters:

repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

publish(obj)[source]

Publish the object to the middleware.

Parameters:

obj – object: Object to publish

class wrapyfi.publishers.mqtt.MqttImagePublisher(name: str, out_topic: str, should_wait: bool = True, multi_threaded: bool = False, width: int = -1, height: int = -1, rgb: bool = True, fp: bool = False, jpg: bool = False, **kwargs)[source]

Bases: MqttNativeObjectPublisher

__init__(name: str, out_topic: str, should_wait: bool = True, multi_threaded: bool = False, width: int = -1, height: int = -1, rgb: bool = True, fp: bool = False, jpg: bool = False, **kwargs)[source]

The ImagePublisher using the MQTT message construct assuming a numpy array as input.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic (e.g. ‘topic’)

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • multi_threaded – bool: Whether to use a separate client for each thread. Default is False

  • width – int: Width of the image. Default is -1 meaning that the width is not fixed

  • height – int: Height of the image. Default is -1 meaning that the height is not fixed

  • rgb – bool: True if the image is RGB, False if it is grayscale. Default is True

  • fp – bool: True if the image is floating point, False if it is integer. Default is False

  • jpg – bool: True if the image should be compressed as JPG. Default is False

publish(img: numpy.ndarray)[source]

Publish the image to the middleware.

Parameters:

img – np.ndarray: Image to publish formatted as a cv2 image np.ndarray[img_height, img_width, channels]

class wrapyfi.publishers.mqtt.MqttAudioChunkPublisher(name: str, out_topic: str, should_wait: bool = True, multi_threaded: bool = False, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs)[source]

Bases: MqttNativeObjectPublisher

__init__(name: str, out_topic: str, should_wait: bool = True, multi_threaded: bool = False, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs)[source]

The AudioChunkPublisher using the MQTT message construct assuming a numpy array as input.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic (e.g. ‘topic’)

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • multi_threaded – bool: Whether to use a separate client for each thread. Default is False

  • channels – int: Number of channels. Default is 1

  • rate – int: Sampling rate. Default is 44100

  • chunk – int: Chunk size. Default is -1 meaning that the chunk size is not fixed

struct = <module 'struct' from '/home/docs/.asdf/installs/python/3.10.14/lib/python3.10/struct.py'>
json = <module 'json' from '/home/docs/.asdf/installs/python/3.10.14/lib/python3.10/json/__init__.py'>
time = <module 'time' (built-in)>
logging = <module 'logging' from '/home/docs/.asdf/installs/python/3.10.14/lib/python3.10/logging/__init__.py'>
publish(aud: Tuple[numpy.ndarray, int])[source]

Publish the audio chunk to the middleware.

Parameters:

aud – Tuple[np.ndarray, int]: Audio chunk to publish formatted as (np.ndarray[audio_chunk, channels], int[samplerate])

class wrapyfi.publishers.mqtt.MqttPropertiesPublisher(name, out_topic, **kwargs)[source]

Bases: MqttPublisher

__init__(name, out_topic, **kwargs)[source]

Initialize the publisher.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic (e.g. ‘topic’)

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • broker_address – str: Address of the MQTT broker. Default is ‘broker.emqx.io’

  • broker_port – int: Port of the MQTT broker. Default is 1883

  • mqtt_kwargs – dict: Additional kwargs for the MQTT middleware

  • kwargs – dict: Additional kwargs for the publisher

wrapyfi.publishers.ros module

class wrapyfi.publishers.ros.ROSPublisher(name: str, out_topic: str, carrier: str = 'tcp', should_wait: bool = True, queue_size: int = 5, ros_kwargs: dict | None = None, **kwargs)[source]

Bases: Publisher

__init__(name: str, out_topic: str, carrier: str = 'tcp', should_wait: bool = True, queue_size: int = 5, ros_kwargs: dict | None = None, **kwargs)[source]

Initialize the publisher.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol. ROS currently only supports TCP for PUB/SUB pattern. Default is ‘tcp’

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • queue_size – int: Queue size for the publisher. Default is 5

  • ros_kwargs – dict: Additional kwargs for the ROS middleware

  • kwargs – dict: Additional kwargs for the publisher

await_connection(publisher, out_topic: str | None = None, repeats: int | None = None)[source]

Wait for at least one subscriber to connect to the publisher.

Parameters:
  • publisher – rospy.Publisher: Publisher to await connection to

  • out_topic – str: Name of the output topic

  • repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

close()[source]

Close the publisher

class wrapyfi.publishers.ros.ROSNativeObjectPublisher(name: str, out_topic: str, carrier: str = 'tcp', should_wait: bool = True, queue_size: int = 5, serializer_kwargs: dict | None = None, **kwargs)[source]

Bases: ROSPublisher

__init__(name: str, out_topic: str, carrier: str = 'tcp', should_wait: bool = True, queue_size: int = 5, serializer_kwargs: dict | None = None, **kwargs)[source]

The NativeObject publisher using the ROS String message assuming a combination of python native objects.

and numpy arrays as input. Serializes the data (including plugins) using the encoder and sends it as a string :param name: str: Name of the publisher :param out_topic: str: Name of the output topic preceded by ‘/’ (e.g. ‘/topic’) :param carrier: str: Carrier protocol. ROS currently only supports TCP for PUB/SUB pattern. Default is ‘tcp’ :param should_wait: bool: Whether to wait for at least one listener before unblocking the script. Default is True :param queue_size: int: Queue size for the publisher. Default is 5 :param serializer_kwargs: dict: Additional kwargs for the serializer

establish(repeats: int | None = None, **kwargs)[source]

Establish the connection.

Parameters:

repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

publish(obj)[source]

Publish the object to the middlware.

Parameters:

obj – object: Object to publish

class wrapyfi.publishers.ros.ROSImagePublisher(name: str, out_topic: str, carrier: str = 'tcp', should_wait: bool = True, queue_size: int = 5, width: int = -1, height: int = -1, rgb: bool = True, fp: bool = False, jpg: bool = False, **kwargs)[source]

Bases: ROSPublisher

__init__(name: str, out_topic: str, carrier: str = 'tcp', should_wait: bool = True, queue_size: int = 5, width: int = -1, height: int = -1, rgb: bool = True, fp: bool = False, jpg: bool = False, **kwargs)[source]

The ImagePublisher using the ROS Image message assuming a numpy array as input.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol. ROS currently only supports TCP for PUB/SUB pattern. Default is ‘tcp’

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • queue_size – int: Queue size for the publisher. Default is 5

  • width – int: Width of the image. Default is -1 meaning that the width is not fixed

  • height – int: Height of the image. Default is -1 meaning that the height is not fixed

  • rgb – bool: True if the image is RGB, False if it is grayscale. Default is True

  • fp – bool: True if the image is floating point, False if it is integer. Default is False

  • jpg – bool: True if the image should be compressed as JPG. Default is False

establish(repeats: int | None = None, **kwargs)[source]

Establish the connection.

Parameters:

repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

publish(img: numpy.ndarray)[source]

Publish the image to the middleware.

Parameters:

img – np.ndarray: Image to publish formatted as a cv2 image np.ndarray[img_height, img_width, channels]

class wrapyfi.publishers.ros.ROSAudioChunkPublisher(name: str, out_topic: str, carrier: str = 'tcp', should_wait: bool = True, queue_size: int = 5, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs)[source]

Bases: ROSPublisher

__init__(name: str, out_topic: str, carrier: str = 'tcp', should_wait: bool = True, queue_size: int = 5, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs)[source]

The AudioChunkPublisher using the ROS Audio message assuming a numpy array as input.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol. ROS currently only supports TCP for PUB/SUB pattern. Default is ‘tcp’

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • queue_size – int: Queue size for the publisher. Default is 5

  • channels – int: Number of channels. Default is 1

  • rate – int: Sampling rate. Default is 44100

  • chunk – int: Chunk size. Default is -1 meaning that the chunk size is not fixed

establish(repeats: int | None = None, **kwargs)[source]

Establish the connection.

Parameters:

repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

publish(aud: Tuple[numpy.ndarray, int])[source]

Publish the audio chunk to the middleware.

Parameters:

aud – Tuple[np.ndarray, int]: Audio chunk to publish formatted as (np.ndarray[audio_chunk, channels], int[samplerate])

class wrapyfi.publishers.ros.ROSPropertiesPublisher(name: str, out_topic: str, carrier: str = 'tcp', persistent: bool = True, **kwargs)[source]

Bases: ROSPublisher

Sets rospy parameters. Behaves differently from other data types by directly setting ROS parameters. Note that the listener is not guaranteed to receive the updated signal, since the listener can trigger before property is set. The property decorated method returns accept native python objects (excluding None), but care should be taken when using dictionaries, since they are analogous with node namespaces: http://wiki.ros.org/rospy/Overview/Parameter%20Server

__init__(name: str, out_topic: str, carrier: str = 'tcp', persistent: bool = True, **kwargs)[source]

The PropertiesPublisher using the ROS parameter server.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol. ROS currently only supports TCP for PUB/SUB pattern. Default is ‘tcp’

  • persistent – bool: True if the parameter should be kept on closing node, False if it should be deleted or reset to its state before the node was started. Default is True

establish(**kwargs)[source]

Store the original property value in case it needs to be reset.

publish(obj)[source]

Publish the property to the middleware (parameter server).

Parameters:

obj – object: Property to publish. If dict, will be set as a namespace

close()[source]

Close the publisher and reset the property to its original value if not persistent.

class wrapyfi.publishers.ros.ROSMessagePublisher(name: str, out_topic: str, carrier: str = 'tcp', should_wait: bool = True, queue_size: int = 5, **kwargs)[source]

Bases: ROSPublisher

__init__(name: str, out_topic: str, carrier: str = 'tcp', should_wait: bool = True, queue_size: int = 5, **kwargs)[source]

The ROSMessagePublisher using the ROS message type inferred from the message type. Supports standard ROS msgs.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol. ROS currently only supports TCP for PUB/SUB pattern. Default is ‘tcp’

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • queue_size – int: Queue size for the publisher. Default is 5

establish(repeats: int | None = None, obj=None, **kwargs)[source]

Establish the connection and import the message requirements.

Parameters:
  • repeats – int: Number of repeats to await connection. None for infinite. Default is None

  • obj – object: Object to establish the connection to

Returns:

bool: True if connection established, False otherwise

publish(obj)[source]

Publish the object to the middleware.

Parameters:

obj – object: ROS message to publish

wrapyfi.publishers.ros2 module

class wrapyfi.publishers.ros2.ROS2Publisher(name: str, out_topic: str, should_wait: bool = True, queue_size: int = 5, ros2_kwargs: dict | None = None, **kwargs)[source]

Bases: Publisher, Node

__init__(name: str, out_topic: str, should_wait: bool = True, queue_size: int = 5, ros2_kwargs: dict | None = None, **kwargs)[source]

Initialize the publisher.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic preceded by ‘/’ (e.g. ‘/topic’)

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • queue_size – int: Queue size for the publisher. Default is 5

  • ros2_kwargs – dict: Additional kwargs for the ROS 2 middleware

  • kwargs – dict: Additional kwargs for the publisher

await_connection(publisher, out_topic: str | None = None, repeats: int | None = None)[source]

Wait for at least one subscriber to connect to the publisher.

Parameters:
  • publisher – rclpy.publisher.Publisher: Publisher to await connection to

  • out_topic – str: Name of the output topic

  • repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

close()[source]

Close the publisher

class wrapyfi.publishers.ros2.ROS2NativeObjectPublisher(name, out_topic: str, should_wait: bool = True, queue_size: int = 5, serializer_kwargs: dict | None = None, **kwargs)[source]

Bases: ROS2Publisher

__init__(name, out_topic: str, should_wait: bool = True, queue_size: int = 5, serializer_kwargs: dict | None = None, **kwargs)[source]

The NativeObject publisher using the ROS 2 String message assuming a combination of python native objects and numpy arrays as input. Serializes the data (including plugins) using the encoder and sends it as a string.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic preceded by ‘/’ (e.g. ‘/topic’)

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • queue_size – int: Queue size for the publisher. Default is 5

  • serializer_kwargs – dict: Additional kwargs for the serializer

establish(repeats: int | None = None, **kwargs)[source]

Establish the connection.

Parameters:

repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

publish(obj)[source]

Publish the object to the middleware

Parameters:

obj – object: Object to publish

class wrapyfi.publishers.ros2.ROS2ImagePublisher(name: str, out_topic: str, should_wait: bool = True, queue_size: int = 5, width: int = -1, height: int = -1, rgb: bool = True, fp: bool = False, jpg: bool = False, **kwargs)[source]

Bases: ROS2Publisher

__init__(name: str, out_topic: str, should_wait: bool = True, queue_size: int = 5, width: int = -1, height: int = -1, rgb: bool = True, fp: bool = False, jpg: bool = False, **kwargs)[source]

The ImagePublisher using the ROS 2 Image message assuming a numpy array as input.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic preceded by ‘/’ (e.g. ‘/topic’)

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • queue_size – int: Queue size for the publisher. Default is 5

  • width – int: Width of the image. Default is -1 meaning that the width is not fixed

  • height – int: Height of the image. Default is -1 meaning that the height is not fixed

  • rgb – bool: True if the image is RGB, False if it is grayscale. Default is True

  • fp – bool: True if the image is floating point, False if it is integer. Default is False

  • jpg – bool: True if the image should be compressed as JPG. Default is False

establish(repeats: int | None = None, **kwargs)[source]

Establish the connection.

Parameters:

repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

publish(img: numpy.ndarray)[source]

Publish the image to the middleware.

Parameters:

img – np.ndarray: Image to publish formatted as a cv2 image np.ndarray[img_height, img_width, channels]

class wrapyfi.publishers.ros2.ROS2AudioChunkPublisher(name: str, out_topic: str, should_wait: bool = True, queue_size: int = 5, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs)[source]

Bases: ROS2Publisher

__init__(name: str, out_topic: str, should_wait: bool = True, queue_size: int = 5, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs)[source]

The AudioChunkPublisher using the ROS 2 Audio message assuming a numpy array as input.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic preceded by ‘/’ (e.g. ‘/topic’)

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • queue_size – int: Queue size for the publisher. Default is 5

  • channels – int: Number of channels. Default is 1

  • rate – int: Sampling rate. Default is 44100

  • chunk – int: Chunk size. Default is -1 meaning that the chunk size is not fixed

establish(repeats: int | None = None, **kwargs)[source]

Establish the connection.

Parameters:

repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

publish(aud: Tuple[numpy.ndarray, int])[source]

Publish the audio chunk to the middleware.

Parameters:

aud – Tuple[np.ndarray, int]: Audio chunk to publish formatted as (np.ndarray[audio_chunk, channels], int[samplerate])

class wrapyfi.publishers.ros2.ROS2PropertiesPublisher(name, out_topic, **kwargs)[source]

Bases: ROS2Publisher

__init__(name, out_topic, **kwargs)[source]

Initialize the publisher.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic preceded by ‘/’ (e.g. ‘/topic’)

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • queue_size – int: Queue size for the publisher. Default is 5

  • ros2_kwargs – dict: Additional kwargs for the ROS 2 middleware

  • kwargs – dict: Additional kwargs for the publisher

class wrapyfi.publishers.ros2.ROS2MessagePublisher(name: str, out_topic: str, should_wait: bool = True, queue_size: int = 5, **kwargs)[source]

Bases: ROS2Publisher

__init__(name: str, out_topic: str, should_wait: bool = True, queue_size: int = 5, **kwargs)[source]

The ROS2MessagePublisher using the ROS 2 message type determined dynamically.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic preceded by ‘/’ (e.g. ‘/topic’)

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • queue_size – int: Queue size for the publisher. Default is 5

get_message_type(msg)[source]

Get the type of a specific message.

Parameters:

msg – ROS 2 message object

Returns:

type: The type of the provided message

establish(msg, repeats: int | None = None, **kwargs)[source]

Establish the connection using the provided message to determine the type.

Parameters:
  • msg – ROS2Message: Message to determine the type.

  • repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

publish(msg)[source]

Publish the data to the middleware.

Parameters:

msg – ROS2Message: Message to publish. This should be formatted according to the expected message type.

wrapyfi.publishers.websocket module

class wrapyfi.publishers.websocket.WebSocketPublisher(name: str, out_topic: str, should_wait: bool = True, socket_ip: str = '127.0.0.1', socket_port: int = 5000, namespace: str = '/', monitor_listener_spawn: str | None = 'thread', websocket_kwargs: dict | None = None, **kwargs)[source]

Bases: Publisher

__init__(name: str, out_topic: str, should_wait: bool = True, socket_ip: str = '127.0.0.1', socket_port: int = 5000, namespace: str = '/', monitor_listener_spawn: str | None = 'thread', websocket_kwargs: dict | None = None, **kwargs)[source]

Initialize the publisher.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic (e.g. ‘topic’)

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • socket_ip – str: IP address of the socket. Default is ‘127.0.0.1’

  • socket_port – int: Port of the socket for publishing. Default is 5000

  • namespace – str: Namespace of the WebSocket. Default is ‘/’

  • monitor_listener_spawn – str: Whether to spawn the monitor listener as a process or thread. Default is ‘thread’

  • websocket_kwargs – dict: Additional kwargs for the WebSocket middleware

  • kwargs – Additional kwargs for the publisher

await_connection(out_topic: str | None = None, repeats: int | None = None)[source]

Wait for the connection to be established.

Parameters:
  • out_topic – str: Name of the output topic

  • repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

close()[source]

Close the publisher.

class wrapyfi.publishers.websocket.WebSocketNativeObjectPublisher(name: str, out_topic: str, should_wait: bool = True, multi_threaded: bool = False, serializer_kwargs: dict | None = None, **kwargs)[source]

Bases: WebSocketPublisher

__init__(name: str, out_topic: str, should_wait: bool = True, multi_threaded: bool = False, serializer_kwargs: dict | None = None, **kwargs)[source]

The NativeObjectPublisher using the WebSocket message construct assuming a combination of python native objects and numpy arrays as input. Serializes the data (including plugins) using the encoder and sends it as a string.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic (e.g. ‘topic’)

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • multi_threaded – bool: Whether to use a separate socket for each thread. Default is False

  • serializer_kwargs – dict: Additional kwargs for the serializer

  • kwargs – dict: Additional kwargs for the publisher

establish(repeats: int | None = None, **kwargs)[source]

Establish the connection to the publisher.

Parameters:

repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

publish(obj)[source]

Publish the object to the middleware.

Parameters:

obj – object: Object to publish

class wrapyfi.publishers.websocket.WebSocketImagePublisher(name: str, out_topic: str, should_wait: bool = True, multi_threaded: bool = False, width: int = -1, height: int = -1, rgb: bool = True, fp: bool = False, jpg: bool = False, **kwargs)[source]

Bases: WebSocketNativeObjectPublisher

__init__(name: str, out_topic: str, should_wait: bool = True, multi_threaded: bool = False, width: int = -1, height: int = -1, rgb: bool = True, fp: bool = False, jpg: bool = False, **kwargs)[source]

The ImagePublisher using the WebSocket message construct assuming a numpy array as input.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic (e.g. ‘topic’)

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • multi_threaded – bool: Whether to use a separate socket for each thread. Default is False

  • width – int: Width of the image. Default is -1 meaning that the width is not fixed

  • height – int: Height of the image. Default is -1 meaning that the height is not fixed

  • rgb – bool: True if the image is RGB, False if it is grayscale. Default is True

  • fp – bool: True if the image is floating point, False if it is integer. Default is False

  • jpg – bool: True if the image should be compressed as JPG. Default is False

publish(img: numpy.ndarray)[source]

Publish the image to the middleware.

Parameters:

img – np.ndarray: Image to publish formatted as a cv2 image np.ndarray[img_height, img_width, channels]

class wrapyfi.publishers.websocket.WebSocketAudioChunkPublisher(name: str, out_topic: str, should_wait: bool = True, multi_threaded: bool = False, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs)[source]

Bases: WebSocketNativeObjectPublisher

__init__(name: str, out_topic: str, should_wait: bool = True, multi_threaded: bool = False, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs)[source]

The AudioChunkPublisher using the WebSocket message construct assuming a numpy array as input.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic (e.g. ‘topic’)

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • multi_threaded – bool: Whether to use a separate socket for each thread. Default is False

  • channels – int: Number of channels. Default is 1

  • rate – int: Sampling rate. Default is 44100

  • chunk – int: Chunk size. Default is -1 meaning that the chunk size is not fixed

publish(aud: Tuple[numpy.ndarray, int])[source]

Publish the audio chunk to the middleware.

Parameters:

aud – Tuple[np.ndarray, int]: Audio chunk to publish formatted as (np.ndarray[audio_chunk, channels], int[samplerate])

class wrapyfi.publishers.websocket.WebSocketPropertiesPublisher(name, out_topic, **kwargs)[source]

Bases: WebSocketPublisher

__init__(name, out_topic, **kwargs)[source]

Initialize the publisher.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic (e.g. ‘topic’)

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • socket_ip – str: IP address of the socket. Default is ‘127.0.0.1’

  • socket_port – int: Port of the socket for publishing. Default is 5000

  • namespace – str: Namespace of the WebSocket. Default is ‘/’

  • monitor_listener_spawn – str: Whether to spawn the monitor listener as a process or thread. Default is ‘thread’

  • websocket_kwargs – dict: Additional kwargs for the WebSocket middleware

  • kwargs – Additional kwargs for the publisher

wrapyfi.publishers.yarp module

class wrapyfi.publishers.yarp.YarpPublisher(name: str, out_topic: str, carrier: Literal['tcp', 'udp', 'mcast'] = 'tcp', should_wait: bool = True, persistent: bool = True, out_topic_connect: str | None = None, yarp_kwargs: dict | None = None, **kwargs)[source]

Bases: Publisher

__init__(name: str, out_topic: str, carrier: Literal['tcp', 'udp', 'mcast'] = 'tcp', should_wait: bool = True, persistent: bool = True, out_topic_connect: str | None = None, yarp_kwargs: dict | None = None, **kwargs)[source]

Initialize the publisher.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol (e.g. ‘tcp’). Default is ‘tcp’

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • persistent – bool: Whether the publisher port should remain connected after closure. Default is True

  • out_topic_connect – str: Name of the output topic connection alias ‘/’ (e.g. ‘/topic:out’) to connect to. None appends ‘:out’ to the out_topic. Default is None

  • yarp_kwargs – dict: Additional kwargs for the Yarp middleware

  • kwargs – dict: Additional kwargs for the publisher

await_connection(port, out_topic: str | None = None, repeats: int | None = None)[source]

Wait for at least one subscriber to connect to the publisher.

Parameters:
  • port – yarp.Port: Port to await connection to

  • out_topic – str: Name of the output topic

  • repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

close()[source]

Close the publisher

class wrapyfi.publishers.yarp.YarpNativeObjectPublisher(name: str, out_topic: str, carrier: Literal['tcp', 'udp', 'mcast'] = 'tcp', should_wait: bool = True, persistent: bool = True, out_topic_connect: str | None = None, serializer_kwargs: dict | None = None, **kwargs)[source]

Bases: YarpPublisher

__init__(name: str, out_topic: str, carrier: Literal['tcp', 'udp', 'mcast'] = 'tcp', should_wait: bool = True, persistent: bool = True, out_topic_connect: str | None = None, serializer_kwargs: dict | None = None, **kwargs)[source]

The NativeObject publisher using the BufferedPortBottle string construct assuming a combination of python native objects and numpy arrays as input. Serializes the data (including plugins) using the encoder and sends it as a string.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol (e.g. ‘tcp’). Default is ‘tcp’

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • persistent – bool: Whether the publisher port should remain connected after closure. Default is True

  • out_topic_connect – str: Name of the output topic connection alias ‘/’ (e.g. ‘/topic:out’) to connect to. None appends ‘:out’ to the out_topic. Default is None

  • serializer_kwargs – dict: Additional kwargs for the serializer

establish(repeats: int | None = None, **kwargs)[source]

Establish the connection

Parameters:

repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

publish(obj)[source]

Publish the object to the middleware.

Parameters:

obj – object: Object to publish

class wrapyfi.publishers.yarp.YarpImagePublisher(name: str, out_topic: str, carrier: Literal['tcp', 'udp', 'mcast'] = 'tcp', should_wait: bool = True, persistent: bool = True, out_topic_connect: str | None = None, width: int = -1, height: int = -1, rgb: bool = True, fp: bool = False, jpg: bool = False, **kwargs)[source]

Bases: YarpPublisher

__init__(name: str, out_topic: str, carrier: Literal['tcp', 'udp', 'mcast'] = 'tcp', should_wait: bool = True, persistent: bool = True, out_topic_connect: str | None = None, width: int = -1, height: int = -1, rgb: bool = True, fp: bool = False, jpg: bool = False, **kwargs)[source]

The Image publisher using the BufferedPortImage construct assuming a numpy array as input.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol (e.g. ‘tcp’). Default is ‘tcp’

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • persistent – bool: Whether the publisher port should remain connected after closure. Default is True

  • out_topic_connect – str: Name of the output topic connection alias ‘/’ (e.g. ‘/topic:out’) to connect to. None appends ‘:out’ to the out_topic. Default is None

  • width – int: Width of the image. Default is -1 meaning the width of the input image

  • height – int: Height of the image. Default is -1 meaning the height of the input image

  • rgb – bool: True if the image is RGB, False if it is grayscale. Default is True

  • fp – bool: True if the image is floating point, False if it is integer. Default is False

  • jpg – bool: True if the image should be compressed as JPG. Default is False

establish(repeats: int | None = None, **kwargs)[source]

Establish the connection.

Parameters:

repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

publish(img: numpy.ndarray)[source]

Publish the image to the middleware.

Parameters:

img – np.ndarray: Image to publish formatted as a cv2 image np.ndarray[img_height, img_width, channels] to publish

class wrapyfi.publishers.yarp.YarpAudioChunkPublisher(name: str, out_topic: str, carrier: Literal['tcp', 'udp', 'mcast'] = 'tcp', should_wait: bool = True, persistent: bool = True, out_topic_connect: str | None = None, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs)[source]

Bases: YarpPublisher

__init__(name: str, out_topic: str, carrier: Literal['tcp', 'udp', 'mcast'] = 'tcp', should_wait: bool = True, persistent: bool = True, out_topic_connect: str | None = None, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs)[source]

The AudioChunk publisher using the Sound construct assuming a numpy array as input.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol (e.g. ‘tcp’). Default is ‘tcp’

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • persistent – bool: Whether the publisher port should remain connected after closure. Default is True

  • out_topic_connect – str: Name of the output topic connection alias ‘/’ (e.g. ‘/topic:out’) to connect to. None appends ‘:out’ to the out_topic. Default is None

  • channels – int: Number of channels. Default is 1

  • rate – int: Sampling rate. Default is 44100

  • chunk – int: Chunk size. Default is -1 meaning that the chunk size is not fixed

establish(repeats: int | None = None, **kwargs)[source]

Establish the connection.

Parameters:

repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

publish(aud: Tuple[numpy.ndarray, int])[source]

Publish the audio chunk to the middleware.

Parameters:

aud – Tuple[np.ndarray, int]: Audio chunk to publish formatted as ((chunk_size, channels), samplerate)

class wrapyfi.publishers.yarp.YarpPropertiesPublisher(name, out_topic, **kwargs)[source]

Bases: YarpPublisher

__init__(name, out_topic, **kwargs)[source]

Initialize the publisher.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol (e.g. ‘tcp’). Default is ‘tcp’

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • persistent – bool: Whether the publisher port should remain connected after closure. Default is True

  • out_topic_connect – str: Name of the output topic connection alias ‘/’ (e.g. ‘/topic:out’) to connect to. None appends ‘:out’ to the out_topic. Default is None

  • yarp_kwargs – dict: Additional kwargs for the Yarp middleware

  • kwargs – dict: Additional kwargs for the publisher

wrapyfi.publishers.zeromq module

class wrapyfi.publishers.zeromq.ZeroMQPublisher(name: str, out_topic: str, carrier: str = 'tcp', should_wait: bool = True, socket_ip: str = '127.0.0.1', socket_pub_port: int = 5555, socket_sub_port: int = 5556, start_proxy_broker: bool = True, proxy_broker_spawn: str = 'process', pubsub_monitor_topic: str = 'ZEROMQ/CONNECTIONS', pubsub_monitor_listener_spawn: str | None = 'process', zeromq_kwargs: dict | None = None, **kwargs)[source]

Bases: Publisher

__init__(name: str, out_topic: str, carrier: str = 'tcp', should_wait: bool = True, socket_ip: str = '127.0.0.1', socket_pub_port: int = 5555, socket_sub_port: int = 5556, start_proxy_broker: bool = True, proxy_broker_spawn: str = 'process', pubsub_monitor_topic: str = 'ZEROMQ/CONNECTIONS', pubsub_monitor_listener_spawn: str | None = 'process', zeromq_kwargs: dict | None = None, **kwargs)[source]

Initialize the publisher and start the proxy broker if necessary.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol. ZeroMQ currently only supports TCP for PUB/SUB pattern. Default is ‘tcp’

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • socket_ip – str: IP address of the socket. Default is ‘127.0.0.1’

  • socket_pub_port – int: Port of the socket for publishing. Default is 5555

  • socket_sub_port – int: Port of the socket for subscribing. Default is 5556

  • start_proxy_broker – bool: Whether to start a proxy broker. Default is True

  • proxy_broker_spawn – str: Whether to spawn the proxy broker as a process or thread. Default is ‘process’

  • pubsub_monitor_topic – str: Topic to monitor the connections. Default is ‘ZEROMQ/CONNECTIONS’

  • pubsub_monitor_listener_spawn – str: Whether to spawn the PUB/SUB monitor listener as a process or thread. Default is ‘process’

  • zeromq_kwargs – dict: Additional kwargs for the ZeroMQ Pub/Sub middleware

  • kwargs – Additional kwargs for the publisher

await_connection(out_topic: str | None = None, repeats: int | None = None)[source]

Wait for the connection to be established.

Parameters:
  • out_topic – str: Name of the output topic

  • repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

close()[source]

Close the publisher.

class wrapyfi.publishers.zeromq.ZeroMQNativeObjectPublisher(name: str, out_topic: str, carrier: str = 'tcp', should_wait: bool = True, multi_threaded: bool = False, serializer_kwargs: dict | None = None, **kwargs)[source]

Bases: ZeroMQPublisher

__init__(name: str, out_topic: str, carrier: str = 'tcp', should_wait: bool = True, multi_threaded: bool = False, serializer_kwargs: dict | None = None, **kwargs)[source]

The NativeObjectPublisher using the ZeroMQ message construct assuming a combination of python native objects and numpy arrays as input. Serializes the data (including plugins) using the encoder and sends it as a string.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol. ZeroMQ currently only supports TCP for PUB/SUB pattern. Default is ‘tcp’

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • multi_threaded – bool: Whether to use a separate socket for each thread. Default is False

  • serializer_kwargs – dict: Additional kwargs for the serializer

  • kwargs – dict: Additional kwargs for the publisher

establish(repeats: int | None = None, **kwargs)[source]

Establish the connection to the publisher.

Parameters:

repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

publish(obj)[source]

Publish the object to the middleware.

Parameters:

obj – object: Object to publish

class wrapyfi.publishers.zeromq.ZeroMQImagePublisher(name: str, out_topic: str, carrier: str = 'tcp', should_wait: bool = True, multi_threaded: bool = False, width: int = -1, height: int = -1, rgb: bool = True, fp: bool = False, jpg: bool = False, **kwargs)[source]

Bases: ZeroMQNativeObjectPublisher

__init__(name: str, out_topic: str, carrier: str = 'tcp', should_wait: bool = True, multi_threaded: bool = False, width: int = -1, height: int = -1, rgb: bool = True, fp: bool = False, jpg: bool = False, **kwargs)[source]

The ImagePublisher using the ZeroMQ message construct assuming a numpy array as input.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol. ZeroMQ currently only supports TCP for PUB/SUB pattern. Default is ‘tcp’

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • multi_threaded – bool: Whether to use a separate socket for each thread. Default is False

  • width – int: Width of the image. Default is -1 meaning that the width is not fixed

  • height – int: Height of the image. Default is -1 meaning that the height is not fixed

  • rgb – bool: True if the image is RGB, False if it is grayscale. Default is True

  • fp – bool: True if the image is floating point, False if it is integer. Default is False

  • jpg – bool: True if the image should be compressed as JPG. Default is False

publish(img: numpy.ndarray)[source]

Publish the image to the middleware.

Parameters:

img – np.ndarray: Image to publish formatted as a cv2 image np.ndarray[img_height, img_width, channels]

class wrapyfi.publishers.zeromq.ZeroMQAudioChunkPublisher(name: str, out_topic: str, carrier: str = 'tcp', should_wait: bool = True, multi_threaded: bool = False, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs)[source]

Bases: ZeroMQNativeObjectPublisher

__init__(name: str, out_topic: str, carrier: str = 'tcp', should_wait: bool = True, multi_threaded: bool = False, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs)[source]

The AudioChunkPublisher using the ZeroMQ message construct assuming a numpy array as input.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol. ZeroMQ currently only supports TCP for PUB/SUB pattern. Default is ‘tcp’

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • multi_threaded – bool: Whether to use a separate socket for each thread. Default is False

  • channels – int: Number of channels. Default is 1

  • rate – int: Sampling rate. Default is 44100

  • chunk – int: Chunk size. Default is -1 meaning that the chunk size is not fixed

publish(aud: Tuple[numpy.ndarray, int])[source]

Publish the audio chunk to the middleware.

Parameters:

aud – Tuple[np.ndarray, int]: Audio chunk to publish formatted as (np.ndarray[audio_chunk, channels], int[samplerate])

class wrapyfi.publishers.zeromq.ZeroMQPropertiesPublisher(name, out_topic, **kwargs)[source]

Bases: ZeroMQPublisher

__init__(name, out_topic, **kwargs)[source]

Initialize the publisher and start the proxy broker if necessary.

Parameters:
  • name – str: Name of the publisher

  • out_topic – str: Name of the output topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol. ZeroMQ currently only supports TCP for PUB/SUB pattern. Default is ‘tcp’

  • should_wait – bool: Whether to wait for at least one listener before unblocking the script. Default is True

  • socket_ip – str: IP address of the socket. Default is ‘127.0.0.1’

  • socket_pub_port – int: Port of the socket for publishing. Default is 5555

  • socket_sub_port – int: Port of the socket for subscribing. Default is 5556

  • start_proxy_broker – bool: Whether to start a proxy broker. Default is True

  • proxy_broker_spawn – str: Whether to spawn the proxy broker as a process or thread. Default is ‘process’

  • pubsub_monitor_topic – str: Topic to monitor the connections. Default is ‘ZEROMQ/CONNECTIONS’

  • pubsub_monitor_listener_spawn – str: Whether to spawn the PUB/SUB monitor listener as a process or thread. Default is ‘process’

  • zeromq_kwargs – dict: Additional kwargs for the ZeroMQ Pub/Sub middleware

  • kwargs – Additional kwargs for the publisher

Module contents

class wrapyfi.publishers.FallbackPublisher(name: str, out_topic: str, carrier: str = '', should_wait: bool = True, missing_middleware_object: str = '', **kwargs)[source]

Bases: Publisher

__init__(name: str, out_topic: str, carrier: str = '', should_wait: bool = True, missing_middleware_object: str = '', **kwargs)[source]

Initialize the Publisher.

Parameters:
  • name – str: The name of the publisher

  • out_topic – str: The name of the output topic

  • carrier – str: The name of the carrier to use

  • should_wait – bool: Whether to wait for the publisher to be established or not

establish(repeats: int = -1, **kwargs)[source]

Establish the publisher.

publish(obj)[source]

Publish an object.

close()[source]

Close the connection.