Source code for wrapyfi.clients.yarp

import logging
import json
import time
from typing import Optional, Literal
import queue

import numpy as np
import yarp

from wrapyfi.connect.clients import Client, Clients
from wrapyfi.middlewares.yarp import YarpMiddleware
from wrapyfi.utils.serialization_encoders import JsonEncoder, JsonDecodeHook


[docs] class YarpClient(Client):
[docs] def __init__( self, name: str, in_topic: str, carrier: Literal["tcp", "udp", "mcast"] = "tcp", persistent: bool = True, yarp_kwargs: Optional[dict] = None, **kwargs, ): """ Initialize the client. :param name: str: Name of the client :param in_topic: str: Name of the input topic preceded by '/' (e.g. '/topic') :param carrier: str: Carrier protocol (e.g. 'tcp'). Default is 'tcp' :param persistent: bool: Whether to keep the service connection alive across multiple service calls. Default is True :param yarp_kwargs: dict: Additional kwargs for the Yarp middleware :param kwargs: dict: Additional kwargs for the client """ super().__init__(name, in_topic, carrier=carrier, **kwargs) YarpMiddleware.activate(**yarp_kwargs or {}) self.persistent = persistent
[docs] def close(self): """ Close the client. """ if hasattr(self, "_port") and self._port: if self._port is not None: self._port.close()
def __del__(self): self.close()
[docs] @Clients.register("NativeObject", "yarp") class YarpNativeObjectClient(YarpClient):
[docs] def __init__( self, name: str, in_topic: str, carrier: Literal["tcp", "udp", "mcast"] = "tcp", persistent: bool = True, serializer_kwargs: Optional[dict] = None, deserializer_kwargs: Optional[dict] = None, **kwargs, ): """ The NativeObject listener using the YARP Bottle construct assuming the data is serialized as a JSON string. Deserializes the data (including plugins) using the decoder and parses it to a Python object. :param name: str: Name of the client :param carrier: str: Carrier protocol (e.g. 'tcp'). Default is 'tcp' :param in_topic: str: Name of the input topic preceded by '/' (e.g. '/topic') :param serializer_kwargs: dict: Additional kwargs for the serializer :param deserializer_kwargs: dict: Additional kwargs for the deserializer :param persistent: bool: Whether to keep the service connection alive across multiple service calls. Default is True :param serializer_kwargs: dict: Additional kwargs for the serializer :param deserializer_kwargs: dict: Additional kwargs for the deserializer :param kwargs: dict: Additional kwargs for the client """ super().__init__( name, in_topic, carrier=carrier, persistent=persistent, **kwargs ) self._port = None self._queue = queue.Queue(maxsize=1) self._plugin_encoder = JsonEncoder self._plugin_kwargs = kwargs self._serializer_kwargs = serializer_kwargs or {} self._plugin_decoder_hook = JsonDecodeHook(**kwargs).object_hook self._deserializer_kwargs = deserializer_kwargs or {}
[docs] def establish(self): """ Establish the client's connection to the YARP service. """ while not yarp.Network.exists(self.in_topic): logging.info(f"[YARP] Waiting for input port: {self.in_topic}") time.sleep(0.2) self._port = yarp.RpcClient() rnd_id = str(np.random.randint(100000, size=1)[0]) self._port.open(self.in_topic + ":in" + rnd_id) self._port.addOutput(self.in_topic, self.carrier) if self.persistent: self.established = True
[docs] def request(self, *args, **kwargs): """ Send a request to the YARP service. :param args: tuple: Positional arguments to send in the request :param kwargs: dict: Keyword arguments to send in the request :return: Any: The response from the YARP service """ if not self.established: self.establish() try: self._request(*args, **kwargs) except Exception as e: logging.error("[YARP] Service call failed: %s" % e) return self._await_reply()
def _request(self, *args, **kwargs): """ Internal method to send a request to the YARP service. :param args: tuple: Positional arguments to send in the request :param kwargs: dict: Keyword arguments to send in the request """ args_str = json.dumps( [args, kwargs], cls=self._plugin_encoder, **self._plugin_kwargs, serializer_kwrags=self._serializer_kwargs, ) args_msg = yarp.Bottle() args_msg.clear() args_msg.addString(args_str) msg = yarp.Bottle() msg.clear() self._port.write(args_msg, msg) obj = json.loads( msg.get(0).asString(), object_hook=self._plugin_decoder_hook, **self._deserializer_kwargs, ) self._queue.put(obj, block=False) def _await_reply(self): """ Wait for and return the reply from the YARP service. :return: Any: The response from the YARP service """ try: reply = self._queue.get(block=True) return reply except queue.Full: logging.warning( f"[YARP] Discarding data because queue is full. " f"This happened due to bad synchronization in {self.__name__}" ) return None
[docs] @Clients.register("Image", "yarp") class YarpImageClient(YarpNativeObjectClient):
[docs] def __init__( self, name: str, in_topic: str, carrier: Literal["tcp", "udp", "mcast"] = "tcp", width: int = -1, height: int = -1, rgb: bool = True, fp: bool = False, persistent: bool = True, serializer_kwargs: Optional[dict] = None, **kwargs, ): """ The Image client using the YARP Bottle construct parsed to a numpy array. :param name: str: Name of the client :param in_topic: str: Name of the input topic preceded by '/' (e.g. '/topic') :param carrier: str: Carrier protocol (e.g. 'tcp'). Default is 'tcp' :param width: int: The width of the image. Default is -1 :param height: int: The height of the image. Default is -1 :param rgb: bool: Whether the image is RGB. Default is True :param fp: bool: Whether to utilize floating-point precision. Default is False :param persistent: bool: Whether to keep the service connection alive across multiple service calls. Default is True :param serializer_kwargs: dict: Additional kwargs for the serializer """ if "jpg" in kwargs: logging.warning( "[YARP] YARP currently does not support JPG encoding in REQ/REP. Using raw image." ) kwargs.pop("jpg") super().__init__( name, in_topic, carrier=carrier, persistent=persistent, serializer_kwargs=serializer_kwargs, **kwargs, ) self.width = width self.height = height self.rgb = rgb self.fp = fp
def _request(self, *args, **kwargs): """ Internal method to send a request to the YARP service. :param args: tuple: Positional arguments to send in the request :param kwargs: dict: Keyword arguments to send in the request """ args_str = json.dumps( [args, kwargs], cls=self._plugin_encoder, **self._plugin_kwargs, serializer_kwrags=self._serializer_kwargs, ) args_msg = yarp.Bottle() args_msg.clear() args_msg.addString(args_str) msg = yarp.Bottle() msg.clear() self._port.write(args_msg, msg) img = json.loads( msg.get(0).asString(), object_hook=self._plugin_decoder_hook, **self._deserializer_kwargs, ) height, width, channels = img.shape if 0 < self.width != width or 0 < self.height != height: raise ValueError("Incorrect image shape for client") else: self._queue.put(img, block=False)
[docs] @Clients.register("AudioChunk", "yarp") class YarpAudioChunkClient(YarpNativeObjectClient):
[docs] def __init__( self, name: str, in_topic: str, carrier: Literal["tcp", "udp", "mcast"] = "tcp", channels: int = 1, rate: int = 44100, chunk: int = -1, persistent: bool = True, serializer_kwargs: Optional[dict] = None, **kwargs, ): """ The AudioChunk client using the YARP Bottle construct parsed to a numpy array. :param name: str: Name of the client :param in_topic: str: Name of the input topic preceded by '/' (e.g. '/topic') :param carrier: str: Carrier protocol (e.g. 'tcp'). Default is 'tcp' :param channels: int: Number of audio channels. Default is 1 :param rate: int: Sampling rate of the audio. Default is 44100 :param chunk: int: The size of audio chunks. Default is -1 :param persistent: bool: Whether to keep the service connection alive across multiple service calls. Default is True :param serializer_kwargs: dict: Additional kwargs for the serializer """ super().__init__( name, in_topic, carrier=carrier, persistent=persistent, serializer_kwargs=serializer_kwargs, **kwargs, ) self.channels = channels self.rate = rate self.chunk = chunk
def _request(self, *args, **kwargs): """ Internal method to send a request to the YARP service. :param args: tuple: Positional arguments to send in the request :param kwargs: dict: Keyword arguments to send in the request """ args_str = json.dumps( [args, kwargs], cls=self._plugin_encoder, **self._plugin_kwargs, serializer_kwrags=self._serializer_kwargs, ) args_msg = yarp.Bottle() args_msg.clear() args_msg.addString(args_str) msg = yarp.Bottle() msg.clear() self._port.write(args_msg, msg) chunk, channels, rate, aud = json.loads( msg.get(0).asString(), object_hook=self._plugin_decoder_hook, **self._deserializer_kwargs, ) if 0 < self.rate != rate: raise ValueError("Incorrect audio rate for client") if ( 0 < self.chunk != chunk or self.channels != channels or aud.size != chunk * channels ): raise ValueError("Incorrect audio shape for client") else: self._queue.put((aud, rate), block=False)