import logging
import json
from typing import Optional, Literal, Tuple
import numpy as np
import yarp
from wrapyfi.connect.servers import Server, Servers
from wrapyfi.middlewares.yarp import YarpMiddleware
from wrapyfi.utils.serialization_encoders import JsonEncoder, JsonDecodeHook
[docs]
class YarpServer(Server):
[docs]
def __init__(
self,
name: str,
out_topic: str,
carrier: Literal["tcp", "udp", "mcast"] = "tcp",
out_topic_connect: Optional[str] = None,
persistent: bool = True,
yarp_kwargs: Optional[dict] = None,
**kwargs,
):
"""
Initialize the server.
:param name: str: Name of the server
:param out_topic: str: Name of the output topic preceded by '/' (e.g. '/topic')
:param carrier: str: Carrier protocol (e.g. 'tcp'). Default is 'tcp'
:param out_topic_connect: str: Name of the output topic connection alias '/' (e.g. '/topic:out') to connect to.
None appends ':out' to the out_topic. Default is None
:param yarp_kwargs: dict: Additional kwargs for the Yarp middleware
:param kwargs: dict: Additional kwargs for the server
"""
super().__init__(
name,
out_topic,
carrier=carrier,
out_topic_connect=out_topic_connect,
**kwargs,
)
YarpMiddleware.activate(**yarp_kwargs or {})
self.style = yarp.ContactStyle()
self.style.persistent = persistent
self.style.carrier = self.carrier
self.persistent = persistent
[docs]
def close(self):
"""
Close the server.
"""
if hasattr(self, "_port") and self._port:
if self._port is not None:
self._port.close()
def __del__(self):
self.close()
[docs]
@Servers.register("NativeObject", "yarp")
class YarpNativeObjectServer(YarpServer):
[docs]
def __init__(
self,
name: str,
out_topic: str,
carrier: Literal["tcp", "udp", "mcast"] = "tcp",
out_topic_connect: Optional[str] = None,
persistent: bool = True,
serializer_kwargs: Optional[dict] = None,
deserializer_kwargs: Optional[dict] = None,
**kwargs,
):
"""
Specific server handling native Python objects, serializing them to JSON strings for transmission.
:param name: str: Name of the server
:param out_topic: str: Name of the output topic preceded by '/' (e.g. '/topic')
:param carrier: str: Carrier protocol (e.g. 'tcp'). Default is 'tcp'
:param out_topic_connect: str: Name of the output topic connection alias '/' (e.g. '/topic:out') to connect to.
None appends ':out' to the out_topic. Default is None
:param persistent: bool: Whether the server port should remain connected after closure. Default is True
:param serializer_kwargs: dict: Additional kwargs for the serializer
:param deserializer_kwargs: dict: Additional kwargs for the deserializer
"""
super().__init__(
name,
out_topic,
carrier=carrier,
out_topic_connect=out_topic_connect,
persistent=persistent,
**kwargs,
)
self._plugin_encoder = JsonEncoder
self._plugin_kwargs = kwargs
self._serializer_kwargs = serializer_kwargs or {}
self._plugin_decoder_hook = JsonDecodeHook(**kwargs).object_hook
self._deserializer_kwargs = deserializer_kwargs or {}
self._port = self._netconnect = None
[docs]
def establish(self):
"""
Establish the connection to the server.
"""
self._port = yarp.RpcServer()
self._port.open(self.out_topic)
if self.style.persistent:
self._netconnect = yarp.Network.connect(
self.out_topic, self.out_topic_connect, self.style
)
else:
self._netconnect = yarp.Network.connect(
self.out_topic, self.out_topic_connect, self.carrier
)
self._netconnect = yarp.Network.connect(
self.out_topic, self.out_topic_connect, self.carrier
)
if self.persistent:
self.established = True
[docs]
def await_request(self, *args, **kwargs):
"""
Await and deserialize the client's request, returning the extracted arguments and keyword arguments.
The method blocks until a message is received, then attempts to deserialize it using the configured JSON decoder
hook, returning the extracted arguments and keyword arguments.
:return: Tuple[list, dict]: A tuple containing two items:
- A list of arguments extracted from the received message
- A dictionary of keyword arguments extracted from the received message
"""
if not self.established:
self.establish()
try:
obj_msg = yarp.Bottle()
obj_msg.clear()
request = False
while not request:
request = self._port.read(obj_msg, True)
[args, kwargs] = json.loads(
obj_msg.get(0).asString(),
object_hook=self._plugin_decoder_hook,
**self._deserializer_kwargs,
)
return args, kwargs
except Exception as e:
logging.error("[YARP] Service call failed: %s" % e)
return [], {}
[docs]
def reply(self, obj):
"""
Serialize the provided Python object to a JSON string and send it as a reply to the client.
The method uses the configured JSON encoder for serialization before sending the resultant string to the client.
:param obj: Any: The Python object to be serialized and sent
"""
obj_str = json.dumps(
obj,
cls=self._plugin_encoder,
**self._plugin_kwargs,
serializer_kwrags=self._serializer_kwargs,
)
obj_msg = yarp.Bottle()
obj_msg.clear()
obj_msg.addString(obj_str)
if self.persistent:
self._port.reply(obj_msg)
else:
self._port.replyAndDrop(obj_msg)
[docs]
@Servers.register("Image", "yarp")
class YarpImageServer(YarpNativeObjectServer):
[docs]
def __init__(
self,
name: str,
out_topic: str,
carrier: Literal["tcp", "udp", "mcast"] = "tcp",
out_topic_connect: Optional[str] = None,
persistent: bool = True,
width: int = -1,
height: int = -1,
rgb: bool = True,
fp: bool = False,
deserializer_kwargs: Optional[dict] = None,
**kwargs,
):
"""
Specific server handling image data as numpy arrays, serializing them to JSON strings for transmission.
:param name: str: Name of the server
:param out_topic: str: Name of the output topic preceded by '/' (e.g. '/topic')
:param carrier: str: Carrier protocol (e.g. 'tcp'). Default is 'tcp'
:param out_topic_connect: str: Name of the output topic connection alias '/' (e.g. '/topic:out') to connect to.
None appends ':out' to the out_topic. Default is None
:param persistent: bool: Whether the server port should remain connected after closure. Default is True
:param width: int: Width of the image. Default is -1 (use the width of the received image)
:param height: int: Height of the image. Default is -1 (use the height of the received image)
:param rgb: bool: True if the image is RGB, False if it is grayscale. Default is True
:param fp: bool: True if the image is floating point, False if it is integer. Default is False
:param deserializer_kwargs: dict: Additional kwargs for the deserializer
"""
if "jpg" in kwargs:
logging.warning(
"[YARP] YARP currently does not support JPG encoding in REQ/REP. Using raw image."
)
kwargs.pop("jpg")
super().__init__(
name,
out_topic,
carrier=carrier,
out_topic_connect=out_topic_connect,
persistent=persistent,
deserializer_kwargs=deserializer_kwargs,
**kwargs,
)
self.width = width
self.height = height
self.rgb = rgb
self.fp = fp
[docs]
def reply(self, img: np.ndarray):
"""
Serialize the provided image data and send it as a reply to the client.
:param img: np.ndarray: Image to send formatted as a cv2 image - np.ndarray[img_height, img_width, channels]
"""
if (
0 < self.width != img.shape[1]
or 0 < self.height != img.shape[0]
or not (
(img.ndim == 2 and not self.rgb)
or (img.ndim == 3 and self.rgb and img.shape[2] == 3)
)
):
raise ValueError("Incorrect image shape for publisher")
# img = np.require(img, dtype=self._type, requirements='C')
super().reply(img)
[docs]
@Servers.register("AudioChunk", "yarp")
class YarpAudioChunkServer(YarpNativeObjectServer):
[docs]
def __init__(
self,
name: str,
out_topic: str,
carrier: Literal["tcp", "udp", "mcast"] = "tcp",
out_topic_connect: Optional[str] = None,
persistent: bool = True,
channels: int = 1,
rate: int = 44100,
chunk: int = -1,
deserializer_kwargs: Optional[dict] = None,
**kwargs,
):
"""
Specific server handling audio data as numpy arrays, serializing them to JSON strings for transmission.
:param name: str: Name of the server
:param out_topic: str: Name of the output topic preceded by '/' (e.g. '/topic')
:param carrier: str: Carrier protocol (e.g. 'tcp'). Default is 'tcp'
:param out_topic_connect: str: Name of the output topic connection alias '/' (e.g. '/topic:out') to connect to.
None appends ':out' to the out_topic. Default is None
:param persistent: bool: Whether the server port should remain connected after closure. Default is True
:param channels: int: Number of channels in the audio. Default is 1
:param rate: int: Sampling rate of the audio. Default is 44100
:param chunk: int: Number of samples in the audio chunk. Default is -1 (use the chunk size of the received audio)
:param deserializer_kwargs: dict: Additional kwargs for the deserializer
"""
super().__init__(
name,
out_topic,
carrier=carrier,
out_topic_connect=out_topic_connect,
persistent=persistent,
deserializer_kwargs=deserializer_kwargs,
**kwargs,
)
self.channels = channels
self.rate = rate
self.chunk = chunk
[docs]
def reply(self, aud: Tuple[np.ndarray, int]):
"""
Serialize the provided audio data and send it as a reply to the client.
:param aud: Tuple[np.ndarray, int]: Audio chunk to publish formatted as (np.ndarray[audio_chunk, channels], int[samplerate])
"""
aud, rate = aud
if aud is None:
return
if 0 < self.rate != rate:
raise ValueError("Incorrect audio rate for server reply")
chunk, channels = aud.shape if len(aud.shape) > 1 else (aud.shape[0], 1)
self.chunk = chunk if self.chunk == -1 else self.chunk
self.channels = channels if self.channels == -1 else self.channels
if 0 < self.chunk != chunk or 0 < self.channels != channels:
raise ValueError("Incorrect audio shape for publisher")
aud = np.require(aud, dtype=np.float32, requirements="C")
super().reply((chunk, channels, rate, aud))