Source code for wrapyfi.clients.ros2

import logging
import sys
import json
import queue
from typing import Optional, Any, Union

import numpy as np
import cv2
import rclpy
from rclpy.node import Node

from wrapyfi.connect.clients import Client, Clients
from wrapyfi.middlewares.ros2 import ROS2Middleware
from wrapyfi.utils.serialization_encoders import JsonEncoder, JsonDecodeHook


[docs] class ROS2Client(Client, Node):
[docs] def __init__( self, name: str, in_topic: str, ros2_kwargs: Optional[dict] = None, **kwargs ): """ Initialize the client. :param name: str: Name of the client :param in_topic: str: Name of the input topic preceded by '/' (e.g. '/topic') :param ros2_kwargs: dict: Additional kwargs for the ROS 2 middleware :param kwargs: dict: Additional kwargs for the client """ carrier = "tcp" if "carrier" in kwargs and kwargs["carrier"] not in ["", None]: logging.warning( "[ROS 2] ROS 2 currently does not support explicit carrier setting for PUB/SUB pattern. Using TCP." ) if "carrier" in kwargs: del kwargs["carrier"] ROS2Middleware.activate(**ros2_kwargs or {}) Client.__init__(self, name, in_topic, carrier=carrier, **kwargs) Node.__init__(self, name + str(hex(id(self))))
[docs] def close(self): """ Close the client. """ if hasattr(self, "_client") and self._client: if self._client is not None: self.destroy_node()
def __del__(self): self.close()
[docs] @Clients.register("NativeObject", "ros2") class ROS2NativeObjectClient(ROS2Client):
[docs] def __init__( self, name: str, in_topic: str, serializer_kwargs: Optional[dict] = None, deserializer_kwargs: Optional[dict] = None, **kwargs, ): """ The NativeObject listener using the ROS 2 String message assuming the data is serialized as a JSON string. Deserializes the data (including plugins) using the decoder and parses it to a Python object. :param name: str: Name of the client :param in_topic: str: Name of the input topic preceded by '/' (e.g. '/topic') :param serializer_kwargs: dict: Additional kwargs for the serializer :param deserializer_kwargs: dict: Additional kwargs for the deserializer """ super().__init__(name, in_topic, **kwargs) self._client = None self._queue = queue.Queue(maxsize=1) self._plugin_encoder = JsonEncoder self._plugin_kwargs = kwargs self._serializer_kwargs = serializer_kwargs or {} self._plugin_decoder_hook = JsonDecodeHook(**kwargs).object_hook self._deserializer_kwargs = deserializer_kwargs or {}
[docs] def establish(self): """ Establish the client's connection to the ROS 2 service. """ try: from wrapyfi_ros2_interfaces.srv import ROS2NativeObjectService except ImportError: import wrapyfi logging.error( "[ROS 2] Could not import ROS2NativeObjectService. " "Make sure the ROS 2 services in wrapyfi_extensions/wrapyfi_ros2_interfaces are compiled. " "Refer to the documentation for more information: \n" + wrapyfi.__doc__ + "ros2_interfaces_lnk.html" ) sys.exit(1) self._client = self.create_client(ROS2NativeObjectService, self.in_topic) self._req_msg = ROS2NativeObjectService.Request() while not self._client.wait_for_service(timeout_sec=1.0): logging.info("[ROS 2] Service not available, waiting again...") self.established = True
[docs] def request(self, *args, **kwargs): """ Send a request to the ROS 2 service. :param args: tuple: Positional arguments to send in the request :param kwargs: dict: Keyword arguments to send in the request :return: Any: The response from the ROS 2 service """ if not self.established: self.establish() try: self._request(*args, **kwargs) except Exception as e: logging.error("[ROS 2] Service call failed: %s" % e) return self._await_reply()
def _request(self, *args, **kwargs): """ Internal method to send a request to the ROS 2 service. :param args: tuple: Positional arguments to send in the request :param kwargs: dict: Keyword arguments to send in the request """ # transmit args to server args_str = json.dumps( [args, kwargs], cls=self._plugin_encoder, **self._plugin_kwargs, serializer_kwrags=self._serializer_kwargs, ) self._req_msg.request = args_str future = self._client.call_async(self._req_msg) # receive message from server while rclpy.ok(): rclpy.spin_once(self) if future.done(): try: msg = future.result() obj = json.loads( msg.response, object_hook=self._plugin_decoder_hook, **self._deserializer_kwargs, ) self._queue.put(obj, block=False) except Exception as e: logging.error("[ROS 2] Service call failed: %s" % e) break def _await_reply(self) -> Any: """ Wait for and return the reply from the ROS 2 service. :return: Any: The response from the ROS 2 service """ try: reply = self._queue.get(block=True) return reply except queue.Full: logging.warning( f"[ROS 2] Discarding data because queue is full. " f"This happened due to bad synchronization in {self.__name__}" ) return None
[docs] @Clients.register("Image", "ros2") class ROS2ImageClient(ROS2Client):
[docs] def __init__( self, name: str, in_topic: str, width: int = -1, height: int = -1, rgb: bool = True, fp: bool = False, jpg: Union[bool, dict] = False, serializer_kwargs: Optional[dict] = None, **kwargs, ): """ The Image client using the ROS 2 Image message parsed to a numpy array. :param name: str: Name of the client :param in_topic: str: Name of the input topic preceded by '/' (e.g. '/topic') :param width: int: The width of the image. Default is -1 (use the width of the received image) :param height: int: The height of the image. Default is -1 (use the height of the received image) :param rgb: bool: Whether the image is RGB. Default is True :param fp: bool: Whether to utilize floating-point precision. Default is False :param jpg: bool: True if the image should be decompressed from JPG. Default is False :param serializer_kwargs: dict: Additional kwargs for the serializer """ super().__init__(name, in_topic, **kwargs) self._client = None self._queue = queue.Queue(maxsize=1) self.width = width self.height = height self.rgb = rgb self.fp = fp self.jpg = jpg if self.fp: self._encoding = "32FC3" if self.rgb else "32FC1" self._type = np.float32 else: self._encoding = "bgr8" if self.rgb else "mono8" self._type = np.uint8 self._pixel_bytes = (3 if self.rgb else 1) * np.dtype(self._type).itemsize self._client = None self._queue = queue.Queue(maxsize=1) self._plugin_encoder = JsonEncoder self._plugin_kwargs = kwargs self._serializer_kwargs = serializer_kwargs or {}
[docs] def establish(self): """ Establish the client's connection to the ROS 2 service. """ try: from wrapyfi_ros2_interfaces.srv import ( ROS2ImageService, ROS2CompressedImageService, ) except ImportError: import wrapyfi logging.error( "[ROS 2] Could not import ROS2ImageService. " "Make sure the ROS 2 services in wrapyfi_extensions/wrapyfi_ros2_interfaces are compiled. " "Refer to the documentation for more information: \n" + wrapyfi.__doc__ + "ros2_interfaces_lnk.html" ) sys.exit(1) if self.jpg: self._client = self.create_client(ROS2CompressedImageService, self.in_topic) self._req_msg = ROS2CompressedImageService.Request() else: self._client = self.create_client(ROS2ImageService, self.in_topic) self._req_msg = ROS2ImageService.Request() while not self._client.wait_for_service(timeout_sec=1.0): logging.info("[ROS 2] Service not available, waiting again...") self.established = True
[docs] def request(self, *args, **kwargs): """ Send a request to the ROS 2 service. :param args: tuple: Positional arguments to send in the request :param kwargs: dict: Keyword arguments to send in the request :return: Any: The response from the ROS 2 service """ if not self.established: self.establish() try: self._request(*args, **kwargs) except Exception as e: logging.error("[ROS 2] Service call failed: %s" % e) return self._await_reply()
def _request(self, *args, **kwargs): """ Internal method to send a request to the ROS 2 service. :param args: tuple: Positional arguments to send in the request :param kwargs: dict: Keyword arguments to send in the request """ # transmit args to server args_str = json.dumps( [args, kwargs], cls=self._plugin_encoder, **self._plugin_kwargs, serializer_kwrags=self._serializer_kwargs, ) self._req_msg.request = args_str future = self._client.call_async(self._req_msg) # receive message from server while rclpy.ok(): rclpy.spin_once(self) if future.done(): try: msg = future.result() data = msg.response if self.jpg: self._queue.put((data.format, data.data), block=False) else: self._queue.put( ( data.height, data.width, data.encoding, data.is_bigendian, data.data, ), block=False, ) except Exception as e: logging.error("[ROS 2] Service call failed: %s" % e) break def _await_reply(self): """ Wait for and return the reply from the ROS 2 service. :return: np.array: The received image from the ROS 2 service """ try: if self.jpg: format, data = self._queue.get(block=True) if format != "jpeg": raise ValueError(f"Unsupported image format: {format}") if self.rgb: img = cv2.imdecode(np.frombuffer(data, np.uint8), cv2.IMREAD_COLOR) else: img = cv2.imdecode( np.frombuffer(data, np.uint8), cv2.IMREAD_GRAYSCALE ) else: height, width, encoding, is_bigendian, data = self._queue.get( block=True ) if encoding != self._encoding: raise ValueError("Incorrect encoding for listener") if ( 0 < self.width != width or 0 < self.height != height or len(data) != height * width * self._pixel_bytes ): raise ValueError("Incorrect image shape for listener") img = np.frombuffer( data, dtype=np.dtype(self._type).newbyteorder( ">" if is_bigendian else "<" ), ).reshape((height, width, -1)) if img.shape[2] == 1: img = img.squeeze(axis=2) return img except queue.Full: logging.warning( f"[ROS 2] Discarding data because queue is full. " f"This happened due to bad synchronization in {self.__name__}" ) return None
[docs] @Clients.register("AudioChunk", "ros2") class ROS2AudioChunkClient(ROS2Client):
[docs] def __init__( self, name: str, in_topic: str, channels: int = 1, rate: int = 44100, chunk: int = -1, serializer_kwargs: Optional[dict] = None, **kwargs, ): """ The AudioChunk client using the ROS 2 Audio message parsed to a numpy array. :param name: str: Name of the client :param in_topic: str: Name of the input topic preceded by '/' (e.g. '/topic') :param channels: int: Number of channels in the audio. Default is 1 :param rate: int: Sampling rate of the audio. Default is 44100 :param chunk: int: Number of samples in the audio chunk. Default is -1 (use the chunk size of the received audio) :param serializer_kwargs: dict: Additional kwargs for the serializer """ super().__init__(name, in_topic, **kwargs) self._client = None self._queue = queue.Queue(maxsize=1) self.channels = channels self.rate = rate self.chunk = chunk self._client = None self._queue = queue.Queue(maxsize=1) self._plugin_encoder = JsonEncoder self._plugin_kwargs = kwargs self._serializer_kwargs = serializer_kwargs or {}
[docs] def establish(self): """ Establish the client's connection to the ROS 2 service. """ try: from wrapyfi_ros2_interfaces.srv import ROS2AudioService except ImportError: import wrapyfi logging.error( "[ROS 2] Could not import ROS2AudioService. " "Make sure the ROS 2 services in wrapyfi_extensions/wrapyfi_ros2_interfaces are compiled. " "Refer to the documentation for more information: \n" + wrapyfi.__doc__ + "ros2_interfaces_lnk.html" ) sys.exit(1) self._client = self.create_client(ROS2AudioService, self.in_topic) self._req_msg = ROS2AudioService.Request() while not self._client.wait_for_service(timeout_sec=1.0): logging.info("[ROS 2] Service not available, waiting again...") self.established = True
[docs] def request(self, *args, **kwargs): """ Send a request to the ROS 2 service. :param args: tuple: Positional arguments to send in the request :param kwargs: dict: Keyword arguments to send in the request :return: Any: The response from the ROS 2 service """ if not self.established: self.establish() try: self._request(*args, **kwargs) except Exception as e: logging.error("[ROS 2] Service call failed: %s" % e) return self._await_reply()
def _request(self, *args, **kwargs): """ Internal method to send a request to the ROS 2 service. :param args: tuple: Positional arguments to send in the request :param kwargs: dict: Keyword arguments to send in the request """ args_str = json.dumps( [args, kwargs], cls=self._plugin_encoder, **self._plugin_kwargs, serializer_kwrags=self._serializer_kwargs, ) self._req_msg.request = args_str future = self._client.call_async(self._req_msg) while rclpy.ok(): rclpy.spin_once(self) if future.done(): try: msg = future.result() data = msg.response self._queue.put( ( data.chunk_size, data.channels, data.sample_rate, data.encoding, data.is_bigendian, data.data, ), block=False, ) except Exception as e: logging.error("[ROS 2] Service call failed: %s" % e) break def _await_reply(self): """ Wait for and return the reply from the ROS 2 service. :return: Tuple[np.ndarray, int]: The received message as a numpy array formatted as (np.ndarray[audio_chunk, channels], int[samplerate]) """ try: chunk, channels, rate, encoding, is_bigendian, data = self._queue.get( block=False ) if 0 < self.rate != rate: raise ValueError("Incorrect audio rate for publisher") if encoding not in ["S16LE", "S16BE"]: raise ValueError("Incorrect encoding for listener") if ( 0 < self.chunk != chunk or self.channels != channels or len(data) != chunk * channels * 4 ): raise ValueError("Incorrect audio shape for listener") aud = np.frombuffer( data, dtype=np.dtype(np.float32).newbyteorder(">" if is_bigendian else "<"), ).reshape((chunk, channels)) # aud = aud / 32767.0 return aud, rate except queue.Full: logging.warning( f"[ROS 2] Discarding data because queue is full. " f"This happened due to bad synchronization in {self.__name__}" ) return None