Source code for wrapyfi.servers.ros2

import logging
import sys
import json
import threading
import queue
from typing import Optional, Tuple, Union

import numpy as np
import rclpy
from rclpy.node import Node

from wrapyfi.connect.servers import Server, Servers
from wrapyfi.middlewares.ros2 import ROS2Middleware
from wrapyfi.utils.serialization_encoders import JsonEncoder, JsonDecodeHook
from wrapyfi.utils.image_encoders import JpegEncoder


[docs] class ROS2Server(Server, Node):
[docs] def __init__( self, name: str, out_topic: str, ros2_kwargs: Optional[dict] = None, **kwargs ): """ Initialize the server. :param name: str: Name of the server :param out_topic: str: Name of the output topic preceded by '/' (e.g. '/topic') :param ros2_kwargs: dict: Additional kwargs for the ROS 2 middleware :param kwargs: dict: Additional kwargs for the server """ carrier = "tcp" if "carrier" in kwargs and kwargs["carrier"] not in ["", None]: logging.warning( "[ROS 2] ROS 2 currently does not support explicit carrier setting for REQ/REP pattern. Using TCP." ) if "carrier" in kwargs: del kwargs["carrier"] ROS2Middleware.activate(**ros2_kwargs or {}) Server.__init__(self, name, out_topic, carrier=carrier, **kwargs) Node.__init__(self, name + str(hex(id(self))))
[docs] def close(self): """ Close the server. """ if hasattr(self, "_server") and self._server: if self._server is not None: self.destroy_node() if hasattr(self, "_background_callback") and self._background_callback: if self._background_callback is not None: self._background_callback.join()
def __del__(self): self.close()
[docs] @Servers.register("NativeObject", "ros2") class ROS2NativeObjectServer(ROS2Server): SEND_QUEUE = queue.Queue(maxsize=1) RECEIVE_QUEUE = queue.Queue(maxsize=1)
[docs] def __init__( self, name: str, out_topic: str, serializer_kwargs: Optional[dict] = None, deserializer_kwargs: Optional[dict] = None, **kwargs, ): """ Specific server handling native Python objects, serializing them to JSON strings for transmission. :param name: str: Name of the server :param out_topic: str: Name of the output topic preceded by '/' (e.g. '/topic') :param serializer_kwargs: dict: Additional kwargs for the serializer :param deserializer_kwargs: dict: Additional kwargs for the deserializer """ super().__init__(name, out_topic, **kwargs) self._plugin_encoder = JsonEncoder self._plugin_kwargs = kwargs self._serializer_kwargs = serializer_kwargs or {} self._plugin_decoder_hook = JsonDecodeHook(**kwargs).object_hook self._deserializer_kwargs = deserializer_kwargs or {} self._server = None
[docs] def establish(self): """ Establish the connection to the server """ try: from wrapyfi_ros2_interfaces.srv import ROS2NativeObjectService except ImportError: import wrapyfi logging.error( "[ROS 2] Could not import ROS2NativeObjectService. " "Make sure the ROS 2 services in wrapyfi_extensions/wrapyfi_ros2_interfaces are compiled. " "Refer to the documentation for more information: \n" + wrapyfi.__doc__ + "ros2_interfaces_lnk.html" ) sys.exit(1) self._server = self.create_service( ROS2NativeObjectService, self.out_topic, self._service_callback ) self._req_msg = ROS2NativeObjectService.Request() self._rep_msg = ROS2NativeObjectService.Response() self.established = True
[docs] def await_request(self, *args, **kwargs): """ Await and deserialize the client's request, returning the extracted arguments and keyword arguments. The method blocks until a message is received, then attempts to deserialize it using the configured JSON decoder hook, returning the extracted arguments and keyword arguments. :return: Tuple[list, dict]: A tuple containing two items: - A list of arguments extracted from the received message - A dictionary of keyword arguments extracted from the received message """ if not self.established: self.establish() try: self._background_callback = threading.Thread( name="ros2_server", target=rclpy.spin_once, args=(self,), kwargs={} ) self._background_callback.setDaemon(True) self._background_callback.start() request = ROS2NativeObjectServer.RECEIVE_QUEUE.get(block=True) [args, kwargs] = json.loads( request.request, object_hook=self._plugin_decoder_hook, **self._deserializer_kwargs, ) return args, kwargs except Exception as e: logging.error("[ROS 2] Service call failed %s" % e) return [], {}
@staticmethod def _service_callback(request, _response): """ Callback for the ROS 2 service. :param request: ROS2NativeObjectService.Request: The request message :param _response: ROS2NativeObjectService.Response: The response message """ ROS2NativeObjectServer.RECEIVE_QUEUE.put(request) return ROS2NativeObjectServer.SEND_QUEUE.get(block=True)
[docs] def reply(self, obj): """ Serialize the provided Python object to a JSON string and send it as a reply to the client. The method uses the configured JSON encoder for serialization before sending the resultant string to the client. :param obj: Any: The Python object to be serialized and sent """ try: obj_str = json.dumps( obj, cls=self._plugin_encoder, **self._plugin_kwargs, serializer_kwrags=self._serializer_kwargs, ) self._rep_msg.response = obj_str ROS2NativeObjectServer.SEND_QUEUE.put(self._rep_msg, block=False) except queue.Full: logging.warning( f"[ROS 2] Discarding data because queue is full. " f"This happened due to bad synchronization in {self.__name__}" )
[docs] @Servers.register("Image", "ros2") class ROS2ImageServer(ROS2Server): SEND_QUEUE = queue.Queue(maxsize=1) RECEIVE_QUEUE = queue.Queue(maxsize=1)
[docs] def __init__( self, name: str, out_topic: str, width: int = -1, height: int = -1, rgb: bool = True, fp: bool = False, jpg: Union[bool, dict] = False, deserializer_kwargs: Optional[dict] = None, **kwargs, ): """ Specific server handling native Python objects, serializing them to JSON strings for transmission. :param name: str: Name of the server :param out_topic: str: Name of the output topic preceded by '/' (e.g. '/topic') :param width: int: Width of the image. Default is -1 (use the width of the received image) :param height: int: Height of the image. Default is -1 (use the height of the received image) :param rgb: bool: True if the image is RGB, False if it is grayscale. Default is True :param fp: bool: True if the image is floating point, False if it is integer. Default is False :param jpg: Union[bool, dict]: If True, compress as JPG with default settings. If a dict, pass arguments to JpegEncoder. Default is False :param deserializer_kwargs: dict: Additional kwargs for the deserializer """ super().__init__(name, out_topic, **kwargs) self._plugin_encoder = JsonEncoder self._plugin_kwargs = kwargs self._plugin_decoder_hook = JsonDecodeHook(**kwargs).object_hook self._deserializer_kwargs = deserializer_kwargs or {} self.width = width self.height = height self.rgb = rgb self.fp = fp self.jpg = jpg if self.fp: self._encoding = "32FC3" if self.rgb else "32FC1" self._type = np.float32 else: self._encoding = "bgr8" if self.rgb else "mono8" self._type = np.uint8 if self.jpg: self._encoding = "jpeg" self._type = np.uint8 self._image_encoder = JpegEncoder( **(self.jpg if isinstance(self.jpg, dict) else {}) ) self._server = None
[docs] def establish(self): """ Establish the connection to the server. """ try: from wrapyfi_ros2_interfaces.srv import ( ROS2ImageService, ROS2CompressedImageService, ) except ImportError: import wrapyfi logging.error( "[ROS 2] Could not import ROS2NativeObjectService. " "Make sure the ROS 2 services in wrapyfi_extensions/wrapyfi_ros2_interfaces are compiled. " "Refer to the documentation for more information: \n" + wrapyfi.__doc__ + "ros2_interfaces_lnk.html" ) sys.exit(1) if self.jpg: self._server = self.create_service( ROS2CompressedImageService, self.out_topic, self._service_callback ) self._req_msg = ROS2CompressedImageService.Request() self._rep_msg = ROS2CompressedImageService.Response() else: self._server = self.create_service( ROS2ImageService, self.out_topic, self._service_callback ) self._req_msg = ROS2ImageService.Request() self._rep_msg = ROS2ImageService.Response() self.established = True
[docs] def await_request(self, *args, **kwargs): """ Await and deserialize the client's request, returning the extracted arguments and keyword arguments. The method blocks until a message is received, then attempts to deserialize it using the configured JSON decoder hook, returning the extracted arguments and keyword arguments. :return: Tuple[list, dict]: A tuple containing two items: - A list of arguments extracted from the received message - A dictionary of keyword arguments extracted from the received message """ if not self.established: self.establish() try: self._background_callback = threading.Thread( name="ros2_server", target=rclpy.spin_once, args=(self,), kwargs={} ) self._background_callback.setDaemon(True) self._background_callback.start() request = ROS2ImageServer.RECEIVE_QUEUE.get(block=True) [args, kwargs] = json.loads( request.request, object_hook=self._plugin_decoder_hook, **self._deserializer_kwargs, ) return args, kwargs except Exception as e: logging.error("[ROS 2] Service call failed %s" % e) return [], {}
@staticmethod def _service_callback(request, _response): """ Callback for the ROS 2 service. :param request: ROS2NativeObjectService.Request: The request message :param _response: ROS2NativeObjectService.Response: The response message :return: ROS2NativeObjectService.Response: The response message """ ROS2ImageServer.RECEIVE_QUEUE.put(request) return ROS2ImageServer.SEND_QUEUE.get(block=True)
[docs] def reply(self, img: np.ndarray): """ Serialize the provided image data and send it as a reply to the client. :param img: np.ndarray: Image to send formatted as a cv2 image - np.ndarray[img_height, img_width, channels] """ try: if ( 0 < self.width != img.shape[1] or 0 < self.height != img.shape[0] or not ( (img.ndim == 2 and not self.rgb) or (img.ndim == 3 and self.rgb and img.shape[2] == 3) ) ): raise ValueError("Incorrect image shape for publisher") img = np.require(img, dtype=self._type, requirements="C") img_msg = self._rep_msg.response if self.jpg: img_msg.header.stamp = rclpy.clock.Clock().now().to_msg() img_msg.format = "jpeg" img_msg.data = self._image_encoder.encode_jpg_image( img, return_numpy=True ).tobytes() else: img_msg.header.stamp = rclpy.clock.Clock().now().to_msg() img_msg.height = img.shape[0] img_msg.width = img.shape[1] img_msg.encoding = self._encoding img_msg.is_bigendian = img.dtype.byteorder == ">" or ( img.dtype.byteorder == "=" and sys.byteorder == "big" ) img_msg.step = img.strides[0] img_msg.data = img.tobytes() self._rep_msg.response = img_msg ROS2ImageServer.SEND_QUEUE.put(self._rep_msg, block=False) except queue.Full: logging.warning( f"[ROS 2] Discarding data because queue is full. " f"This happened due to bad synchronization in {self.__name__}" )
[docs] @Servers.register("AudioChunk", "ros2") class ROS2AudioChunkServer(ROS2Server): SEND_QUEUE = queue.Queue(maxsize=1) RECEIVE_QUEUE = queue.Queue(maxsize=1)
[docs] def __init__( self, name: str, out_topic: str, channels: int = 1, rate: int = 44100, chunk: int = -1, deserializer_kwargs: Optional[dict] = None, **kwargs, ): """ Specific server handling audio data as numpy arrays. :param name: str: Name of the server :param out_topic: str: Name of the output topic preceded by '/' (e.g. '/topic') :param channels: int: Number of channels in the audio. Default is 1 :param rate: int: Sampling rate of the audio. Default is 44100 :param chunk: int: Number of samples in the audio chunk. Default is -1 (use the chunk size of the received audio) :param deserializer_kwargs: dict: Additional kwargs for the deserializer """ super().__init__(name, out_topic, **kwargs) self._plugin_encoder = JsonEncoder self._plugin_kwargs = kwargs self._plugin_decoder_hook = JsonDecodeHook(**kwargs).object_hook self._deserializer_kwargs = deserializer_kwargs or {} self.channels = channels self.rate = rate self.chunk = chunk
[docs] def establish(self): """ Establish the connection to the server. """ try: from wrapyfi_ros2_interfaces.srv import ROS2AudioService except ImportError: import wrapyfi logging.error( "[ROS 2] Could not import ROS2AudioService. " "Make sure the ROS 2 services in wrapyfi_extensions/wrapyfi_ros2_interfaces are compiled. " "Refer to the documentation for more information: \n" + wrapyfi.__doc__ + "ros2_interfaces_lnk.html" ) sys.exit(1) self._server = self.create_service( ROS2AudioService, self.out_topic, self._service_callback ) self._req_msg = ROS2AudioService.Request() self._rep_msg = ROS2AudioService.Response() self.established = True
[docs] def await_request(self, *args, **kwargs): """ Await and deserialize the client's request, returning the extracted arguments and keyword arguments. The method blocks until a message is received, then attempts to deserialize it using the configured JSON decoder hook, returning the extracted arguments and keyword arguments. :return: Tuple[list, dict]: A tuple containing two items: - A list of arguments extracted from the received message - A dictionary of keyword arguments extracted from the received message """ if not self.established: self.establish() try: self._background_callback = threading.Thread( name="ros2_server", target=rclpy.spin_once, args=(self,), kwargs={} ) self._background_callback.setDaemon(True) self._background_callback.start() request = ROS2AudioChunkServer.RECEIVE_QUEUE.get(block=True) [args, kwargs] = json.loads( request.request, object_hook=self._plugin_decoder_hook, **self._deserializer_kwargs, ) return args, kwargs except Exception as e: logging.error("[ROS 2] Service call failed %s" % e) return [], {}
@staticmethod def _service_callback(request, _response): """ Callback for the ROS 2 service. :param request: ROS2NativeObjectService.Request: The request message :param _response: ROS2NativeObjectService.Response: The response message :return: ROS2NativeObjectService.Response: The response message """ ROS2AudioChunkServer.RECEIVE_QUEUE.put(request) return ROS2AudioChunkServer.SEND_QUEUE.get(block=True)
[docs] def reply(self, aud: Tuple[np.ndarray, int]): """ Serialize the provided audio data and send it as a reply to the client. :param aud: Tuple[np.ndarray, int]: Audio chunk to publish formatted as (np.ndarray[audio_chunk, channels], int[samplerate]) """ try: aud, rate = aud if aud is None: return if 0 < self.rate != rate: raise ValueError("Incorrect audio rate for publisher") chunk, channels = aud.shape if len(aud.shape) > 1 else (aud.shape[0], 1) self.chunk = chunk if self.chunk == -1 else self.chunk self.channels = channels if self.channels == -1 else self.channels if 0 < self.chunk != chunk or 0 < self.channels != channels: raise ValueError("Incorrect audio shape for publisher") aud = np.require(aud, dtype=np.float32, requirements="C") aud_msg = self._rep_msg.response aud_msg.header.stamp = self.get_clock().now().to_msg() aud_msg.chunk_size = chunk aud_msg.channels = channels aud_msg.sample_rate = rate aud_msg.is_bigendian = aud.dtype.byteorder == ">" or ( aud.dtype.byteorder == "=" and sys.byteorder == "big" ) aud_msg.encoding = "S16BE" if aud_msg.is_bigendian else "S16LE" aud_msg.step = aud.strides[0] aud_msg.data = aud.tobytes() # (aud * 32767.0).tobytes() self._rep_msg.response = aud_msg ROS2AudioChunkServer.SEND_QUEUE.put(self._rep_msg, block=False) except queue.Full: logging.warning( f"[ROS 2] Discarding data because queue is full. " f"This happened due to bad synchronization in {self.__name__}" )