Source code for wrapyfi.publishers.ros2

import logging
import sys
import json
import time
import os
from typing import Optional, Tuple, Union

import numpy as np
import rclpy
from rclpy.node import Node
import std_msgs.msg
import sensor_msgs.msg

from wrapyfi.connect.publishers import Publisher, Publishers, PublisherWatchDog
from wrapyfi.middlewares.ros2 import ROS2Middleware
from wrapyfi.utils.serialization_encoders import JsonEncoder
from wrapyfi.utils.image_encoders import JpegEncoder


QUEUE_SIZE = int(os.environ.get("WRAPYFI_ROS2_QUEUE_SIZE", 5))
WATCHDOG_POLL_REPEAT = None


[docs] class ROS2Publisher(Publisher, Node):
[docs] def __init__( self, name: str, out_topic: str, should_wait: bool = True, queue_size: int = QUEUE_SIZE, ros2_kwargs: Optional[dict] = None, **kwargs, ): """ Initialize the publisher. :param name: str: Name of the publisher :param out_topic: str: Name of the output topic preceded by '/' (e.g. '/topic') :param should_wait: bool: Whether to wait for at least one listener before unblocking the script. Default is True :param queue_size: int: Queue size for the publisher. Default is 5 :param ros2_kwargs: dict: Additional kwargs for the ROS 2 middleware :param kwargs: dict: Additional kwargs for the publisher """ carrier = "tcp" if "carrier" in kwargs and kwargs["carrier"] not in ["", None]: logging.warning( "[ROS 2] ROS 2 currently does not support explicit carrier setting for PUB/SUB pattern. Using TCP." ) if "carrier" in kwargs: del kwargs["carrier"] ROS2Middleware.activate(**ros2_kwargs or {}) Publisher.__init__( self, name, out_topic, carrier=carrier, should_wait=should_wait, **kwargs ) Node.__init__(self, name + str(hex(id(self)))) self.queue_size = queue_size
[docs] def await_connection( self, publisher, out_topic: Optional[str] = None, repeats: Optional[int] = None ): """ Wait for at least one subscriber to connect to the publisher. :param publisher: rclpy.publisher.Publisher: Publisher to await connection to :param out_topic: str: Name of the output topic :param repeats: int: Number of repeats to await connection. None for infinite. Default is None :return: bool: True if connection established, False otherwise """ connected = False if out_topic is None: out_topic = self.out_topic logging.info(f"[ROS 2] Waiting for topic subscriber: {out_topic}") if repeats is None: repeats = -1 if self.should_wait else 1 while repeats > 0 or repeats <= -1: repeats -= 1 connected = publisher.get_subscription_count() > 0 if connected: logging.info(f"[ROS 2] Topic subscriber connected: {out_topic}") break time.sleep(0.02) return connected
[docs] def close(self): """ Close the publisher """ if hasattr(self, "_publisher") and self._publisher: if self._publisher is not None: self.destroy_node()
def __del__(self): self.close()
[docs] @Publishers.register("NativeObject", "ros2") class ROS2NativeObjectPublisher(ROS2Publisher):
[docs] def __init__( self, name, out_topic: str, should_wait: bool = True, queue_size: int = QUEUE_SIZE, serializer_kwargs: Optional[dict] = None, **kwargs, ): """ The NativeObject publisher using the ROS 2 String message assuming a combination of python native objects and numpy arrays as input. Serializes the data (including plugins) using the encoder and sends it as a string. :param name: str: Name of the publisher :param out_topic: str: Name of the output topic preceded by '/' (e.g. '/topic') :param should_wait: bool: Whether to wait for at least one listener before unblocking the script. Default is True :param queue_size: int: Queue size for the publisher. Default is 5 :param serializer_kwargs: dict: Additional kwargs for the serializer """ super().__init__( name, out_topic, should_wait=should_wait, queue_size=queue_size, **kwargs ) self._plugin_encoder = JsonEncoder self._plugin_kwargs = kwargs self._serializer_kwargs = serializer_kwargs or {} self._publisher = None if not self.should_wait: PublisherWatchDog().add_publisher(self)
[docs] def establish(self, repeats: Optional[int] = None, **kwargs): """ Establish the connection. :param repeats: int: Number of repeats to await connection. None for infinite. Default is None :return: bool: True if connection established, False otherwise """ self._publisher = self.create_publisher( std_msgs.msg.String, self.out_topic, qos_profile=self.queue_size ) established = self.await_connection(self._publisher, repeats=repeats) return self.check_establishment(established)
[docs] def publish(self, obj): """ Publish the object to the middleware :param obj: object: Object to publish """ if not self.established: established = self.establish(repeats=WATCHDOG_POLL_REPEAT) if not established: return else: time.sleep(0.2) obj_str = json.dumps( obj, cls=self._plugin_encoder, **self._plugin_kwargs, serializer_kwrags=self._serializer_kwargs, ) obj_str_msg = std_msgs.msg.String() obj_str_msg.data = obj_str self._publisher.publish(obj_str_msg)
[docs] @Publishers.register("Image", "ros2") class ROS2ImagePublisher(ROS2Publisher):
[docs] def __init__( self, name: str, out_topic: str, should_wait: bool = True, queue_size: int = QUEUE_SIZE, width: int = -1, height: int = -1, rgb: bool = True, fp: bool = False, jpg: Union[bool, dict] = False, **kwargs, ): """ The ImagePublisher using the ROS 2 Image message assuming a numpy array as input. :param name: str: Name of the publisher :param out_topic: str: Name of the output topic preceded by '/' (e.g. '/topic') :param should_wait: bool: Whether to wait for at least one listener before unblocking the script. Default is True :param queue_size: int: Queue size for the publisher. Default is 5 :param width: int: Width of the image. Default is -1 meaning that the width is not fixed :param height: int: Height of the image. Default is -1 meaning that the height is not fixed :param rgb: bool: True if the image is RGB, False if it is grayscale. Default is True :param fp: bool: True if the image is floating point, False if it is integer. Default is False :param jpg: Union[bool, dict]: If True, compress as JPG with default settings. If a dict, pass arguments to JpegEncoder. Default is False """ super().__init__( name, out_topic, should_wait=should_wait, queue_size=queue_size, **kwargs ) self.width = width self.height = height self.rgb = rgb self.fp = fp self.jpg = jpg if self.fp: self._encoding = "32FC3" if self.rgb else "32FC1" self._type = np.float32 else: self._encoding = "bgr8" if self.rgb else "mono8" self._type = np.uint8 if self.jpg: self._encoding = "jpeg" self._type = np.uint8 self._image_encoder = JpegEncoder( **(self.jpg if isinstance(self.jpg, dict) else {}) ) self._publisher = None if not self.should_wait: PublisherWatchDog().add_publisher(self)
[docs] def establish(self, repeats: Optional[int] = None, **kwargs): """ Establish the connection. :param repeats: int: Number of repeats to await connection. None for infinite. Default is None :return: bool: True if connection established, False otherwise """ if self.jpg: self._publisher = self.create_publisher( sensor_msgs.msg.CompressedImage, self.out_topic, qos_profile=self.queue_size, ) else: self._publisher = self.create_publisher( sensor_msgs.msg.Image, self.out_topic, qos_profile=self.queue_size ) established = self.await_connection(self._publisher) return self.check_establishment(established)
[docs] def publish(self, img: np.ndarray): """ Publish the image to the middleware. :param img: np.ndarray: Image to publish formatted as a cv2 image np.ndarray[img_height, img_width, channels] """ if img is None: return if not self.established: established = self.establish(repeats=WATCHDOG_POLL_REPEAT) if not established: return else: time.sleep(0.2) if ( 0 < self.width != img.shape[1] or 0 < self.height != img.shape[0] or not ( (img.ndim == 2 and not self.rgb) or (img.ndim == 3 and self.rgb and img.shape[2] == 3) ) ): raise ValueError("Incorrect image shape for publisher") img = np.require(img, dtype=self._type, requirements="C") if self.jpg: img_msg = sensor_msgs.msg.CompressedImage() img_msg.header.stamp = rclpy.clock.Clock().now().to_msg() img_msg.format = "jpeg" img_msg.data = self._image_encoder.encode_jpg_image( img, return_numpy=True ).tobytes() else: img_msg = sensor_msgs.msg.Image() img_msg.header.stamp = self.get_clock().now().to_msg() img_msg.height = img.shape[0] img_msg.width = img.shape[1] img_msg.encoding = self._encoding img_msg.is_bigendian = img.dtype.byteorder == ">" or ( img.dtype.byteorder == "=" and sys.byteorder == "big" ) img_msg.step = img.strides[0] img_msg.data = img.tobytes() self._publisher.publish(img_msg)
[docs] @Publishers.register("AudioChunk", "ros2") class ROS2AudioChunkPublisher(ROS2Publisher):
[docs] def __init__( self, name: str, out_topic: str, should_wait: bool = True, queue_size: int = QUEUE_SIZE, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs, ): """ The AudioChunkPublisher using the ROS 2 Audio message assuming a numpy array as input. :param name: str: Name of the publisher :param out_topic: str: Name of the output topic preceded by '/' (e.g. '/topic') :param should_wait: bool: Whether to wait for at least one listener before unblocking the script. Default is True :param queue_size: int: Queue size for the publisher. Default is 5 :param channels: int: Number of channels. Default is 1 :param rate: int: Sampling rate. Default is 44100 :param chunk: int: Chunk size. Default is -1 meaning that the chunk size is not fixed """ super().__init__( name, out_topic, should_wait=should_wait, queue_size=queue_size, **kwargs ) self.channels = channels self.rate = rate self.chunk = chunk self._publisher = self._sound_msg = None if not self.should_wait: PublisherWatchDog().add_publisher(self)
[docs] def establish(self, repeats: Optional[int] = None, **kwargs): """ Establish the connection. :param repeats: int: Number of repeats to await connection. None for infinite. Default is None :return: bool: True if connection established, False otherwise """ try: from wrapyfi_ros2_interfaces.msg import ROS2AudioMessage except ImportError: import wrapyfi logging.error( "[ROS 2] Could not import ROS2AudioMessage. " "Make sure the ROS 2 services in wrapyfi_extensions/wrapyfi_ros2_interfaces are compiled. " "Refer to the documentation for more information: \n" + wrapyfi.__doc__ + "ros2_interfaces_lnk.html" ) sys.exit(1) self._publisher = self.create_publisher( ROS2AudioMessage, self.out_topic, qos_profile=self.queue_size ) self._sound_msg = ROS2AudioMessage() established = self.await_connection(self._publisher) return self.check_establishment(established)
[docs] def publish(self, aud: Tuple[np.ndarray, int]): """ Publish the audio chunk to the middleware. :param aud: Tuple[np.ndarray, int]: Audio chunk to publish formatted as (np.ndarray[audio_chunk, channels], int[samplerate]) """ if not self.established: established = self.establish(repeats=WATCHDOG_POLL_REPEAT) if not established: return else: time.sleep(0.2) aud, rate = aud if aud is None: return if 0 < self.rate != rate: raise ValueError("Incorrect audio rate for publisher") chunk, channels = aud.shape if len(aud.shape) > 1 else (aud.shape[0], 1) self.chunk = chunk if self.chunk == -1 else self.chunk self.channels = channels if self.channels == -1 else self.channels if 0 < self.chunk != chunk or 0 < self.channels != channels: raise ValueError("Incorrect audio shape for publisher") aud = np.require(aud, dtype=np.float32, requirements="C") aud_msg = self._sound_msg aud_msg.header.stamp = self.get_clock().now().to_msg() aud_msg.chunk_size = chunk aud_msg.channels = channels aud_msg.sample_rate = rate aud_msg.is_bigendian = aud.dtype.byteorder == ">" or ( aud.dtype.byteorder == "=" and sys.byteorder == "big" ) aud_msg.encoding = "S16BE" if aud_msg.is_bigendian else "S16LE" aud_msg.step = aud.strides[0] aud_msg.data = aud.tobytes() # (aud * 32767.0).tobytes() self._publisher.publish(aud_msg)
[docs] @Publishers.register("Properties", "ros2") class ROS2PropertiesPublisher(ROS2Publisher):
[docs] def __init__(self, name, out_topic, **kwargs): super().__init__(name, out_topic, **kwargs) raise NotImplementedError
[docs] @Publishers.register("ROS2Message", "ros2") class ROS2MessagePublisher(ROS2Publisher):
[docs] def __init__( self, name: str, out_topic: str, should_wait: bool = True, queue_size: int = QUEUE_SIZE, **kwargs, ): """ The ROS2MessagePublisher using the ROS 2 message type determined dynamically. :param name: str: Name of the publisher :param out_topic: str: Name of the output topic preceded by '/' (e.g. '/topic') :param should_wait: bool: Whether to wait for at least one listener before unblocking the script. Default is True :param queue_size: int: Queue size for the publisher. Default is 5 """ super().__init__( name, out_topic, should_wait=should_wait, queue_size=queue_size, **kwargs ) self._publisher = None self._msg_type = None if not self.should_wait: PublisherWatchDog().add_publisher(self)
[docs] def get_message_type(self, msg): """ Get the type of a specific message. :param msg: ROS 2 message object :return: type: The type of the provided message """ return type(msg)
[docs] def establish(self, msg, repeats: Optional[int] = None, **kwargs): """ Establish the connection using the provided message to determine the type. :param msg: ROS2Message: Message to determine the type. :param repeats: int: Number of repeats to await connection. None for infinite. Default is None :return: bool: True if connection established, False otherwise """ self._msg_type = self.get_message_type(msg) self._publisher = self.create_publisher( self._msg_type, self.out_topic, qos_profile=self.queue_size ) established = self.await_connection(self._publisher) return self.check_establishment(established)
[docs] def publish(self, msg): """ Publish the data to the middleware. :param msg: ROS2Message: Message to publish. This should be formatted according to the expected message type. """ if not self._publisher: self.establish(msg) if not self.established: established = self.establish(msg, repeats=WATCHDOG_POLL_REPEAT) if not established: return else: time.sleep(0.2) self._publisher.publish(msg)