Source code for examples.robots.icub_head

"""
iCub Head Controller and Camera Viewer using Wrapyfi for Communication

This script demonstrates the capability to control the iCub robot's head and view its camera feed using
the MiddlewareCommunicator within the Wrapyfi library. The communication follows the PUB/SUB pattern,
allowing for message publishing and listening functionalities between processes or machines.

Demonstrations:
    - Using Image messages for camera feed transmission.
    - Running publishers and listeners concurrently with the yarp.RFModule.
    - Using Wrapyfi for creating a port listener only.

Requirements:
    - Wrapyfi: Middleware communication wrapper (refer to the Wrapyfi documentation for installation instructions)
    - YARP, ROS, ROS 2, ZeroMQ (refer to the Wrapyfi documentation for installation instructions)
    - iCub Robot and Simulator: Ensure the robot and its simulator are installed and configured.
        When running in simulation mode, the `iCub_SIM` must be running in a standalone terminal
        (refer to the Wrapyfi documentation for installation instructions)
    - NumPy: Used for creating arrays (installed with Wrapyfi)
    - SciPy: For applying smoothing filters to the facial expressions (refer to https://www.scipy.org/install.html for installation instructions)
    - Pexpect: To control the facial expressions using RPC

    Install using pip:
    ``pip install "scipy==1.9.0" pexpect``

Run:
    # For the list of keyboard controls, refer to the comments in Keyboard Controls.

    # Alternative 1: Simulation Mode
        # Ensure that the `iCub_SIM` is running in a standalone terminal.

        # The listener displays images, and coordinates are published without utilizing Wrapyfi's utilities.

        ``python3 icub_head.py --simulation --get_cam_feed --control_head --control_expressions``

    # Alternative 2: Physical Robot
        # The listener displays images, and coordinates are published without utilizing Wrapyfi's utilities.

        ``python3 icub_head.py --get_cam_feed --control_head --control_expressions``

Keyboard Controls:
    - Head Control:
        - Up/Down: Control the head pitch
        - Right/Left: Control the head yaw
        - A/D: Control the head roll (right/left)
        - R: Reset the head to the initial position
        - Esc: Quit the application
    - Eye Control:
        - W/S: Control the eye pitch (up/down)
        - C/Z: Control the eye yaw (right/left)
        - R: Reset the eye to the initial position
        - Esc: Quit the application
    - Facial Expressions:
        - 0: Neutral
        - 1: Happy
        - 2: Sad
        - 3: Surprise
        - 4: Fear
        - 5: Disgust
        - 6: Anger
        - 7: Contempt
        - 8: Cunning
        - 9: Shy
        - Esc: Quit the application
    - Camera Feed:
        - Esc: Quit the application
"""

import os
import time
import argparse
import logging
from collections import deque

import cv2
import numpy as np
import yarp

try:
    import pexpect

    HAVE_PEXPECT = True
except ImportError:
    HAVE_PEXPECT = False

from wrapyfi.connect.wrapper import MiddlewareCommunicator

ICUB_DEFAULT_COMMUNICATOR = os.environ.get("WRAPYFI_DEFAULT_COMMUNICATOR", "yarp")
ICUB_DEFAULT_COMMUNICATOR = os.environ.get(
    "WRAPYFI_DEFAULT_MWARE", ICUB_DEFAULT_COMMUNICATOR
)
ICUB_DEFAULT_COMMUNICATOR = os.environ.get(
    "ICUB_DEFAULT_COMMUNICATOR", ICUB_DEFAULT_COMMUNICATOR
)
ICUB_DEFAULT_COMMUNICATOR = os.environ.get(
    "ICUB_DEFAULT_MWARE", ICUB_DEFAULT_COMMUNICATOR
)

EMOTION_LOOKUP = {
    "Neutral": [("LIGHTS", "neu")],
    "Happy": [("all", "hap")],
    "Sad": [("LIGHTS", "sad")],
    "Surprise": [("LIGHTS", "sur")],
    "Fear": [("raw", "L04"), ("raw", "R04"), ("raw", "M66")],  # change to array
    "Disgust": [("raw", "L01"), ("raw", "R01"), ("raw", "M66")],  # change to array
    "Anger": [("LIGHTS", "ang")],
    "Contempt": [("raw", "L01"), ("raw", "R09"), ("raw", "ME9")],  # change to array
    "Cunning": [("LIGHTS", "cun")],
    "Shy": [("LIGHTS", "shy")],
    "Evil": [("LIGHTS", "evi")],
}


[docs] def cartesian_to_spherical(xyz=None, x=None, y=None, z=None, expand_return=None): """ Convert cartesian coordinates to spherical coordinates. :param xyz: tuple: Cartesian coordinates (x, y, z) :param x: float: Cartesian coordinate x :param y: float: Cartesian coordinate y :param z: float: Cartesian coordinate z :param expand_return: bool: Whether to return the spherical coordinates as a dictionary or not :return: tuple: Spherical coordinates (p, t, r) or dictionary: Spherical coordinates {"p": p, "t": t, "r": r} """ from operator import xor import numpy as np assert xor((xyz is not None), all((x is not None, y is not None, z is not None))) if expand_return is None: expand_return = False if xyz is None else True if xyz is None: xyz = (x, y, z) ptr = np.zeros((3,)) xy = xyz[0] ** 2 + xyz[1] ** 2 ptr[0] = np.arctan2(xyz[1], xyz[0]) ptr[1] = np.arctan2( xyz[2], np.sqrt(xy) ) # for elevation angle defined from XY-plane up # ptr[1] = np.arctan2(np.sqrt(xy), xyz[2]) # for elevation angle defined from Z-axis down ptr[2] = np.sqrt(xy + xyz[2] ** 2) return ptr if not expand_return else {"p": ptr[0], "t": ptr[1], "r": ptr[2]}
[docs] def mode_smoothing_filter(time_series, default, window_length=6, min_count=None): """ Apply a smoothing filter to the time series using the mode of the last `window_length` values. :param time_series: list: Time series to apply the smoothing filter to :param default: object: Default value to return if the mode count is less than `min_count` :param window_length: int: Length of the window to apply the smoothing filter to :param min_count: int: Minimum number of values in the window to apply the smoothing filter """ import scipy.stats if min_count is None: min_count = window_length // 2 mode = scipy.stats.mode(time_series[-window_length:]) return mode.mode[0] if mode.count >= min_count else default
[docs] class ICub(MiddlewareCommunicator, yarp.RFModule): """ ICub head controller, facial expression transmitter and camera viewer. Head control can be achieved following two methods: 1. Using the `control_head_gaze` method, which controls the head gaze in the spherical coordinate system. 2. Using the `control_gaze_at_plane` method, which controls the head gaze in the cartesian coordinate system. Emotions can be controlled using the `update_facial_expressions` method. Camera feed can be viewed using the `receive_images` method. """ MWARE = ICUB_DEFAULT_COMMUNICATOR CAP_PROP_FRAME_WIDTH = 320 CAP_PROP_FRAME_HEIGHT = 240 HEAD_COORDINATES_PORT = "/control_interface/head_coordinates" EYE_COORDINATES_PORT = "/control_interface/eye_coordinates" GAZE_PLANE_COORDINATES_PORT = "/control_interface/gaze_plane_coordinates" FACIAL_EXPRESSIONS_PORT = "/control_interface/facial_expressions" # constants FACIAL_EXPRESSIONS_QUEUE_SIZE = 7 FACIAL_EXPRESSION_SMOOTHING_WINDOW = 6
[docs] def __init__( self, simulation=False, headless=False, get_cam_feed=True, img_width=CAP_PROP_FRAME_WIDTH, img_height=CAP_PROP_FRAME_HEIGHT, control_head=True, set_head_coordinates=True, head_coordinates_port=HEAD_COORDINATES_PORT, control_eyes=True, set_eye_coordinates=True, eye_coordinates_port=EYE_COORDINATES_PORT, ikingaze=False, gaze_plane_coordinates_port=GAZE_PLANE_COORDINATES_PORT, control_expressions=False, set_facial_expressions=True, facial_expressions_port=FACIAL_EXPRESSIONS_PORT, mware=MWARE, ): """ Initialize the ICub head controller, facial expression transmitter and camera viewer. :param simulation: bool: Whether to run the simulation or not :param headless: bool: Whether to run the headless mode or not :param get_cam_feed: bool: Whether to get (listen) the camera feed or not :param img_width: int: Width of the image :param img_height: int: Height of the image :param control_head: bool: Whether to control the head :param set_head_coordinates: bool: Whether to set (publish) the head coordinates :param head_coordinates_port: str: Port to receive the head coordinates for controlling the head :param control_eyes: bool: Whether to control the eyes :param set_eye_coordinates: bool: Whether to set (publish) the eye coordinates :param eye_coordinates_port: str: Port to receive the eye coordinates for controlling the eyes :param ikingaze: bool: Whether to use the iKinGazeCtrl :param gaze_plane_coordinates_port: str: Port to receive the gaze plane coordinates for controlling the head/eyes :param control_expressions: bool: Whether to control the facial expressions :param set_facial_expressions: bool: Whether to set (publish) the facial expressions :param facial_expressions_port: str: Port to receive the facial expressions for controlling the facial expressions :param mware: str: Middleware to use """ self.__name__ = "iCubController" MiddlewareCommunicator.__init__(self) yarp.RFModule.__init__(self) self.MWARE = mware self.FACIAL_EXPRESSIONS_PORT = facial_expressions_port self.GAZE_PLANE_COORDINATES_PORT = gaze_plane_coordinates_port self.HEAD_COORDINATES_PORT = head_coordinates_port self.EYE_COORDINATES_PORT = eye_coordinates_port self.headless = headless self.ikingaze = ikingaze # prepare a property object props = yarp.Property() props.put("device", "remote_controlboard") props.put("local", "/client/head") if simulation: props.put("remote", "/icubSim/head") self.cam_props = { "cam_world_port": "/icubSim/cam", "cam_left_port": "/icubSim/cam/left", "cam_right_port": "/icubSim/cam/right", } emotion_cmd = f"yarp rpc /icubSim/face/emotions/in" else: props.put("remote", "/icub/head") self.cam_props = { "cam_world_port": "/icub/cam/left", "cam_left_port": "/icub/cam/left", "cam_right_port": "/icub/cam/right", } emotion_cmd = f"yarp rpc /icub/face/emotions/in" if img_width is not None: self.img_width = img_width self.CAP_PROP_FRAME_WIDTH = img_width self.cam_props["img_width"] = img_width if img_height is not None: self.img_height = img_height self.CAP_PROP_FRAME_HEIGHT = img_height self.cam_props["img_height"] = img_height if control_expressions: if HAVE_PEXPECT: # control emotional expressions using RPC self.client = pexpect.spawn(emotion_cmd) else: logging.error( "pexpect must be installed to control the emotion interface" ) self.activate_communication(self.update_facial_expressions, "disable") self.last_expression = [ "", "", ] # (emotion part on the robot's face , emotional expression category) self.expressions_queue = deque(maxlen=self.FACIAL_EXPRESSIONS_QUEUE_SIZE) else: self.activate_communication(self.update_facial_expressions, "disable") self._curr_eyes = [0, 0, 0] self._curr_head = [0, 0, 0] if control_head or control_eyes: if ikingaze: self._gaze_encs = yarp.Vector(3, 0.0) props_gaze = yarp.Property() props_gaze.clear() props_gaze.put("device", "gazecontrollerclient") props_gaze.put("remote", "/iKinGazeCtrl") props_gaze.put("local", "/client/gaze") # self._gaze_driver = yarp.PolyDriver(props_gaze) self._igaze = self._gaze_driver.viewIGazeControl() self._igaze.setStabilizationMode(True) # set movement speed # self.update_head_gaze_speed(head=0.8) # self.update_eye_gaze_speed(eye=0.5) self.activate_communication(self.control_head_gaze, "disable") self.activate_communication(self.control_eye_gaze, "disable") self.activate_communication(self._control_head_eye_gaze, "disable") else: # create remote driver self._head_driver = yarp.PolyDriver(props) # query motor control interfaces self._ipos = self._head_driver.viewIPositionControl() self._ienc = self._head_driver.viewIEncoders() # retrieve number of joints self._num_jnts = self._ipos.getAxes() logging.info(f"controlling {self._num_jnts} joints") # read encoders self._encs = yarp.Vector(self._num_jnts) self._ienc.getEncoders(self._encs.data()) if not control_head: self.activate_communication(self.control_head_gaze, "disable") self.activate_communication(self.update_head_gaze_speed, "disable") if not control_eyes: self.activate_communication(self.control_eye_gaze, "disable") self.activate_communication(self.update_eye_gaze_speed, "disable") self.init_pos_head = yarp.Vector(self._num_jnts, self._encs.data()) self.init_pos_eyes = yarp.Vector(self._num_jnts, self._encs.data()) self.init_pos = yarp.Vector(self._num_jnts, self._encs.data()) # set movement speed # self.update_head_gaze_speed(pitch=10.0, roll=10.0, yaw=20.0) # self.update_eye_gaze_speed(pitch=10.0, yaw=10.0, vergence=20.0) else: self.activate_communication(self.reset_gaze, "disable") self.activate_communication(self.update_head_gaze_speed, "disable") self.activate_communication(self.control_head_gaze, "disable") self.activate_communication(self.update_eye_gaze_speed, "disable") self.activate_communication(self.control_eye_gaze, "disable") self.activate_communication(self._control_head_eye_gaze, "disable") self.activate_communication(self.wait_for_gaze, "disable") self.activate_communication(self.control_gaze_at_plane, "disable") if get_cam_feed: # control the listening properties from within the app self.activate_communication(self.receive_images, "listen") if facial_expressions_port: if set_facial_expressions: self.activate_communication(self.acquire_facial_expressions, "publish") else: for _ in range(self.FACIAL_EXPRESSIONS_QUEUE_SIZE): self.update_facial_expressions("hap", part="all", smoothing="mode") self.activate_communication(self.acquire_facial_expressions, "listen") if head_coordinates_port: if set_head_coordinates: self.activate_communication(self.acquire_head_coordinates, "publish") else: self.activate_communication(self.acquire_head_coordinates, "listen") if eye_coordinates_port: if set_eye_coordinates: self.activate_communication(self.acquire_eye_coordinates, "publish") else: self.activate_communication(self.acquire_eye_coordinates, "listen") if gaze_plane_coordinates_port: self.activate_communication(self.control_gaze_at_plane, "listen") self.build()
[docs] def build(self): """ Updates the default method arguments according to constructor arguments. This method is called by the module constructor. It is not necessary to call it manually. """ ICub.acquire_head_coordinates.__defaults__ = ( self.HEAD_COORDINATES_PORT, None, self.MWARE, ) ICub.acquire_eye_coordinates.__defaults__ = ( self.EYE_COORDINATES_PORT, None, self.MWARE, ) ICub.receive_gaze_plane_coordinates.__defaults__ = ( self.GAZE_PLANE_COORDINATES_PORT, self.MWARE, ) ICub.wait_for_gaze.__defaults__ = (True, self.MWARE) ICub.reset_gaze.__defaults__ = (self.MWARE,) ICub.update_head_gaze_speed.__defaults__ = (10.0, 10.0, 20.0, 0.8, self.MWARE) ICub.control_head_gaze.__defaults__ = (0.0, 0.0, 0.0, "xyz", self.MWARE) ICub.update_eye_gaze_speed.__defaults__ = (10.0, 10.0, 20.0, 0.5, self.MWARE) ICub.control_eye_gaze.__defaults__ = (0.0, 0.0, 0.0, self.MWARE) ICub._control_head_eye_gaze.__defaults__ = (self.MWARE,) ICub.control_gaze_at_plane.__defaults__ = ( 0, 0, 0.3, 0.3, True, True, self.MWARE, ) ICub.acquire_facial_expressions.__defaults__ = ( self.FACIAL_EXPRESSIONS_PORT, None, self.MWARE, ) ICub.update_facial_expressions.__defaults__ = (None, False, "mode", self.MWARE) ICub.receive_images.__defaults__ = ( self.CAP_PROP_FRAME_WIDTH, self.CAP_PROP_FRAME_HEIGHT, True, )
[docs] @MiddlewareCommunicator.register( "NativeObject", "$_mware", "ICub", "$head_coordinates_port", should_wait=False ) def acquire_head_coordinates( self, head_coordinates_port=HEAD_COORDINATES_PORT, cv2_key=None, _mware=MWARE, **kwargs, ): """ Acquire head coordinates for controlling the iCub. :param head_coordinates_port: str: Port to receive head coordinates :param cv2_key: int: Key pressed by the user :return: dict: Head orientation coordinates """ if cv2_key is None: logging.error("controlling orientation in headless mode not yet supported") return (None,) else: if cv2_key == 27: # Esc key to exit exit(0) elif cv2_key == -1: # normally -1 returned,so don't print it pass # the keyboard commands for controlling the robot elif cv2_key == 82: # Up key self._curr_head[0] += 1 logging.info("head pitch up") elif cv2_key == 84: # Down key self._curr_head[0] -= 1 logging.info("head pitch down") elif cv2_key == 83: # Right key self._curr_head[2] -= 1 logging.info("head yaw left") elif cv2_key == 81: # Left key self._curr_head[2] += 1 logging.info("head yaw right") elif cv2_key == 97: # A key self._curr_head[1] -= 1 logging.info("head roll right") elif cv2_key == 100: # D key self._curr_head[1] += 1 logging.info("head roll left") elif cv2_key == 114: # R key: reset the orientation self._curr_head = [0, 0, 0] self.reset_gaze() logging.info("resetting the orientation") else: logging.info(cv2_key) # else print its value return (None,) return ( { "topic": head_coordinates_port.split("/")[-1], "timestamp": time.time(), "pitch": self._curr_head[0], "roll": self._curr_head[1], "yaw": self._curr_head[2], "order": "zyx", }, )
[docs] @MiddlewareCommunicator.register( "NativeObject", "$_mware", "ICub", "$eye_coordinates_port", should_wait=False ) def acquire_eye_coordinates( self, eye_coordinates_port=EYE_COORDINATES_PORT, cv2_key=None, _mware=MWARE, **kwargs, ): """ Acquire eye coordinates for controlling the iCub. :param eye_coordinates_port: str: Port to receive eye coordinates :param cv2_key: int: Key pressed by the user :return: dict: Eye oreintation coordinates """ if cv2_key is None: logging.error("controlling orientation in headless mode not yet supported") return (None,) else: if cv2_key == 27: # Esc key to exit exit(0) elif cv2_key == -1: # normally -1 returned,so don't print it pass # the keyboard commands for controlling the robot elif cv2_key == 119: # W key self._curr_eyes[0] += 1 logging.info("eye pitch up") elif cv2_key == 115: # S key self._curr_eyes[0] -= 1 logging.info("eye pitch down") elif cv2_key == 122: # Z key self._curr_eyes[1] -= 1 logging.info("eye yaw left") elif cv2_key == 99: # C key self._curr_eyes[1] += 1 logging.info("eye yaw right") elif cv2_key == 114: # R key: reset the orientation self._curr_eyes = [0, 0, 0] self.reset_gaze() logging.info("resetting the orientation") else: logging.info(cv2_key) # else print its value return (None,) return ( { "topic": eye_coordinates_port.split("/")[-1], "timestamp": time.time(), "pitch": self._curr_eyes[0], "yaw": self._curr_eyes[1], "vergence": self._curr_eyes[2], }, )
[docs] @MiddlewareCommunicator.register( "NativeObject", "$_mware", "ICub", "$gaze_plane_coordinates_port", should_wait=False, ) def receive_gaze_plane_coordinates( self, gaze_plane_coordinates_port=GAZE_PLANE_COORDINATES_PORT, _mware=MWARE, **kwargs, ): """ Receive gaze plane (normalized x,y) coordinates for controlling the iCub. :param gaze_plane_coordinates_port: str: Port to receive gaze plane coordinates :return: dict: Gaze plane coordinates """ return (None,)
[docs] @MiddlewareCommunicator.register( "NativeObject", "$_mware", "ICub", "/icub_controller/logs/wait_for_gaze", should_wait=False, ) def wait_for_gaze(self, reset=True, _mware=MWARE): """ Wait for the gaze actuation to complete. :param reset: bool: Whether to reset the gaze location (centre) :param _mware: str: Middleware to use :return: dict: Gaze waiting log for a given time step """ if self.ikingaze: # self._igaze.clearNeckPitch() # self._igaze.clearNeckRoll() # self._igaze.clearNeckYaw() # self._igaze.clearEyes() if reset: self._igaze.lookAtAbsAngles(self._gaze_encs) self._igaze.waitMotionDone(timeout=2.0) else: if reset: self._ipos.positionMove(self._encs.data()) while not self._ipos.checkMotionDone(): pass return ( { "topic": "logging_wait_for_gaze", "timestamp": time.time(), "command": f"waiting for gaze completed with reset={reset}", }, )
[docs] @MiddlewareCommunicator.register( "NativeObject", "$_mware", "ICub", "/icub_controller/logs/reset_gaze", should_wait=False, ) def reset_gaze(self, _mware=MWARE): """ Reset the eyes and head to their original position. :param _mware: str: Middleware to use :return: dict: Gaze reset log for a given time step """ self.wait_for_gaze(reset=True) return ( { "topic": "logging_reset_gaze", "timestamp": time.time(), "command": f"reset gaze", }, )
[docs] @MiddlewareCommunicator.register( "NativeObject", "$mware", "ICub", "/icub_controller/logs/head_speed", should_wait=False, ) def update_head_gaze_speed( self, pitch=10.0, roll=10.0, yaw=20.0, head=0.8, _mware=MWARE, **kwargs ): """ Control the iCub head speed. :param pitch: float->pitch[deg/s]: Pitch speed :param roll: float->roll[deg/s]: Roll speed :param yaw: float->yaw[deg/s]: Yaw speed :param head: float->speed[0,1]: Neck trajectory speed in normalized units (only when using iKinGazeCtrl) :param _mware: str: Middleware to use :return: dict: Head orientation speed log for a given time step """ if self.ikingaze: self._igaze.setNeckTrajTime(head) return ( { "topic": "logging_head_speed", "timestamp": time.time(), "command": f"head speed set to {head}", }, ) else: self._ipos.setRefSpeed(0, pitch) self._ipos.setRefSpeed(1, roll) self._ipos.setRefSpeed(2, yaw) return ( { "topic": "logging_head_speed", "timestamp": time.time(), "command": f"head speed set to {pitch, roll, yaw} (pitch, roll, yaw)", }, )
[docs] @MiddlewareCommunicator.register( "NativeObject", "$mware", "ICub", "/icub_controller/logs/eye_speed", should_wait=False, ) def update_eye_gaze_speed( self, pitch=10.0, yaw=10.0, vergence=20.0, eye=0.5, _mware=MWARE, **kwargs ): """ Control the iCub eye speed. :param pitch: float->pitch[deg/s]: Pitch speed :param yaw: float->yaw[deg/s]: Yaw speed :param vergence: float->vergence[deg/s]: Speed of vergence shift between the eyes :param eye: float->speed[0,1]: Eye trajectory speed in normalized units (only when using iKinGazeCtrl) :param _mware: str: Middleware to use :return: dict: Eye orientation speed log for a given time step """ if self.ikingaze: self._igaze.setEyesTrajTime(eye) return ( { "topic": "logging_eye_speed", "timestamp": time.time(), "command": f"eye speed set to {eye}", }, ) else: self._ipos.setRefSpeed(3, pitch) self._ipos.setRefSpeed(4, yaw) self._ipos.setRefSpeed(5, vergence) return ( { "topic": "logging_eye_speed", "timestamp": time.time(), "command": f"eye speed set to {pitch, yaw, vergence} (pitch, yaw, vergence)", }, )
[docs] @MiddlewareCommunicator.register( "NativeObject", "$_mware", "ICub", "/icub_controller/logs/head_orientation_coordinates", should_wait=False, ) def control_head_gaze( self, pitch=0.0, roll=0.0, yaw=0.0, order="xyz", _mware=MWARE, **kwargs ): """ Control the iCub head relative to previous coordinates following the roll,pitch,yaw convention (order=xyz) (initialized at 0 looking straight ahead). :param pitch: float->pitch[deg]: Pitch angle :param roll: float->roll[deg]: Roll angle :param yaw: float->yaw[deg]: Yaw angle :param order: str: Euler axis order. Only accepts xyz (roll, pitch, yaw) :param _mware: str: Middleware to use :return: dict: Head orientation coordinates log for a given time step """ if order != "xyz": logging.error( "only accepts ratation angles following the order='xyz' convention" ) return (None,) # wait for the action to complete # self.wait_for_gaze(reset=False) # initialize a new tmp vector identical to encs self.init_pos_head = yarp.Vector(self._num_jnts, self._encs.data()) # head control self.init_pos_head.set(0, self.init_pos_head.get(0) + pitch) # tilt/pitch self.init_pos_head.set(1, self.init_pos_head.get(1) + roll) # swing/roll self.init_pos_head.set(2, self.init_pos_head.get(2) + yaw) # pan/yaw # self._ipos.positionMove(self.init_pos_head.data()) self._curr_head = list((pitch, roll, yaw)) return ( { "topic": "logging_head_coordinates", "timestamp": time.time(), "command": f"head orientation set to {self._curr_head} (pitch, roll, yaw)", }, )
[docs] @MiddlewareCommunicator.register( "NativeObject", "$_mware", "ICub", "/icub_controller/logs/eye_orientation_coordinates", should_wait=False, ) def control_eye_gaze( self, pitch=0.0, yaw=0.0, vergence=0.0, _mware=MWARE, **kwargs ): """ Control the iCub eyes relative to previous coordinates (initialized at 0 looking straight ahead). :param pitch: float->pitch[deg]: Pitch angle :param yaw: float->yaw[deg]: Yaw (version) angle :param vergence: float->yaw[deg]: Vergence angle between the eyes :param _mware: str: Middleware to use :return: dict: Eye orientation coordinates log for a given time step """ # wait for the action to complete # self.wait_for_gaze(reset=False) # initialize a new tmp vector identical to encs self.init_pos_eyes = yarp.Vector(self._num_jnts, self._encs.data()) # eye control self.init_pos_eyes.set(3, self.init_pos_eyes.get(3) + pitch) # eye tilt self.init_pos_eyes.set(4, self.init_pos_eyes.get(4) + yaw) # eye pan/version self.init_pos_eyes.set( 5, self.init_pos_eyes.get(5) + vergence ) # the vergence between the eyes (to align, set to 0) # self._ipos.positionMove(self.init_pos_eyes.data()) self._curr_eyes = list((pitch, yaw, vergence)) return ( { "topic": "logging_eye_coordinates", "timestamp": time.time(), "command": f"eye orientation set to {self._curr_eyes} (pitch, yaw, vergence)", }, )
@MiddlewareCommunicator.register( "NativeObject", "$_mware", "ICub", "/icub_controller/logs/head_eye_orientation_coordinates", should_wait=False, ) def _control_head_eye_gaze(self, _mware=MWARE, **kwargs): """ Issue the movement command :param _mware: str: Middleware to use :return: dict: Head orientation coordinates log for a given time step """ # initialize a new tmp vector identical to encs self.init_pos = yarp.Vector(self._num_jnts, self._encs.data()) # head + eye control self.init_pos.set(0, self.init_pos_head.get(0)) # tilt/pitch self.init_pos.set(1, self.init_pos_head.get(1)) # swing/roll self.init_pos.set(2, self.init_pos_head.get(2)) # pan/yaw self.init_pos.set(3, self.init_pos_eyes.get(3)) # eye tilt self.init_pos.set(4, self.init_pos_eyes.get(4)) # eye pan/version self.init_pos.set( 5, self.init_pos_eyes.get(5) ) # the vergence between the eyes (to align, set to 0) self._ipos.positionMove(self.init_pos.data()) return ( { "topic": "logging_head_eye_coordinates", "timestamp": time.time(), "command": f"head orientation set to {self._curr_head} (pitch, roll, yaw) and eye orientation to {self._curr_eyes} (pitch, yaw, vergence)", }, )
[docs] @MiddlewareCommunicator.register( "NativeObject", "$_mware", "ICub", "/icub_controller/logs/gaze_plane_coordinates", should_wait=False, ) def control_gaze_at_plane( self, x=0.0, y=0.0, limit_x=0.3, limit_y=0.3, control_eyes=True, control_head=True, _mware=MWARE, **kwargs, ): """ Gaze at specific point in a normalized plane in front of the iCub. :param x: float->x[-1,1]: x coordinate in the plane limited to the range of -1 (left) and 1 (right) :param y: float->y[-1,1]: y coordinate in the plane limited to the range of -1 (bottom) and 1 (top) :param limit_x: float->limit_x[0,1]: x coordinate limit in the plane :param limit_y: float->limit_y[0,1]: y coordinate limit in the plane :param control_eyes: bool: Whether to control the eyes of the robot directly :param control_head: bool: Whether to control the head of the robot directly :return: dict: Gaze coordinates log for a given time step """ # wait for the action to complete # self.wait_for_gaze(reset=False) xy = np.array((x, y)) * np.array((limit_x, limit_y)) # limit viewing region ptr = cartesian_to_spherical(x=1, y=xy[0], z=-xy[1], expand_return=False) # initialize a new tmp vector identical to encs ptr_degrees = (np.rad2deg(ptr[0]), np.rad2deg(ptr[1])) if control_eyes and control_head: if not self.ikingaze: logging.error( "set ikingaze=True in order to move eyes and head simultaneously" ) return (None,) self.init_pos_ikin = yarp.Vector(3, self._gaze_encs.data()) self.init_pos_ikin.set(0, ptr_degrees[0]) self.init_pos_ikin.set(1, ptr_degrees[1]) self.init_pos_ikin.set(2, 0.0) self._igaze.lookAtAbsAngles(self.init_pos_ikin) elif control_head: if self.ikingaze: logging.error("set ikingaze=False in order to move head only") return (None,) self.control_head_gaze(pitch=ptr_degrees[1], roll=0, yaw=ptr_degrees[0]) elif control_eyes: if self.ikingaze: logging.error("set ikingaze=False in order to move eyes only") return (None,) self.control_eye_gaze(pitch=ptr_degrees[1], yaw=ptr_degrees[0], vergence=0) return ( { "topic": "logging_gaze_plane_coordinates", "timestamp": time.time(), "command": f"moving gaze toward {ptr_degrees} with head={control_head} and eyes={control_eyes}", }, )
[docs] @MiddlewareCommunicator.register( "NativeObject", "$_mware", "ICub", "$facial_expressions_port", should_wait=False ) def acquire_facial_expressions( self, facial_expressions_port=FACIAL_EXPRESSIONS_PORT, cv2_key=None, _mware=MWARE, **kwargs, ): """ Acquire facial expressions from the iCub. :param facial_expressions_port: str: Port to acquire facial expressions from :param cv2_key: int: Key to press to set the facial expression :return: dict: Facial expressions log for a given time step """ emotion = None if cv2_key is None: logging.error("controlling expressions in headless mode not yet supported") return (None,) else: if cv2_key == 27: # Esc key to exit exit(0) elif cv2_key == -1: # normally -1 returned,so don"t print it pass elif cv2_key == 48: # 0 key: Neutral emotion emotion = "Neutral" logging.info("expressing neutrality") elif cv2_key == 49: # 1 key: Happy emotion emotion = "Happy" logging.info("expressing happiness") elif cv2_key == 50: # 2 key: Sad emotion emotion = "Sad" logging.info("expressing sadness") elif cv2_key == 51: # 3 key: Surprise emotion emotion = "Surprise" logging.info("expressing surprise") elif cv2_key == 52: # 4 key: Fear emotion emotion = "Fear" logging.info("expressing fear") elif cv2_key == 53: # 5 key: Disgust emotion emotion = "Disgust" logging.info("expressing disgust") elif cv2_key == 54: # 6 key: Anger emotion emotion = "Anger" logging.info("expressing anger") elif cv2_key == 55: # 7 key: Contempt emotion emotion = "Contempt" logging.info("expressing contempt") elif cv2_key == 56: # 8 key: Cunning emotion emotion = "Cunning" logging.info("expressing cunningness") elif cv2_key == 57: # 9 key: Shy emotion emotion = "Shy" logging.info("expressing shyness") else: logging.info(cv2_key) # else print its value return (None,) return ( { "topic": facial_expressions_port.split("/")[-1], "timestamp": time.time(), "emotion_category": emotion, }, )
[docs] @MiddlewareCommunicator.register( "NativeObject", "$_mware", "ICub", "/icub_controller/logs/facial_expressions", should_wait=False, ) def update_facial_expressions( self, expression, part=False, smoothing="mode", _mware=MWARE, **kwargs ): """ Control facial expressions of the iCub. :param expression: str: Expression to be controlled :param expression: str or tuple(str->part, str->emotion) or list[str] or list[tuple(str->part, str->emotion)]: Expression/s abbreviation or matching lookup table entry. If a list is provided, the actions are executed in sequence :param part: str: Abbreviation describing parts to control (refer to iCub documentation) ( mou, eli, leb, reb, all, raw, LIGHTS) :param smoothing: str: Name of smoothing filter to avoid abrupt changes in emotional expressions :return: Emotion log for a given time step """ if expression is None: return (None,) if isinstance(expression, (list, tuple)): expression = expression[-1] if smoothing == "mode": self.expressions_queue.append(expression) transmitted_expression = mode_smoothing_filter( list(self.expressions_queue), default="neu", window_length=self.FACIAL_EXPRESSION_SMOOTHING_WINDOW, ) else: transmitted_expression = expression expressions_lookup = EMOTION_LOOKUP.get( transmitted_expression, transmitted_expression ) if isinstance(expressions_lookup, str): expressions_lookup = [(part if part else "LIGHTS", expressions_lookup)] if ( self.last_expression[0] == (part if part else "LIGHTS") and self.last_expression[1] == transmitted_expression ): expressions_lookup = [] for part_lookup, expression_lookup in expressions_lookup: if part_lookup == "LIGHTS": self.client.sendline(f"set leb {expression_lookup}") self.client.expect(">>") self.client.sendline(f"set reb {expression_lookup}") self.client.expect(">>") self.client.sendline(f"set mou {expression_lookup}") self.client.expect(">>") logging.info(f"set leb/reb/mou {expression_lookup}") else: self.client.sendline(f"set {part_lookup} {expression_lookup}") self.client.expect(">>") logging.info(f"set {part_lookup} {expression_lookup}") self.last_expression[0] = part self.last_expression[1] = transmitted_expression return ( { "topic": "logging_facial_expressions", "timestamp": time.time(), "command": f"emotion set to {part} {expression} with smoothing={smoothing}", }, )
[docs] @MiddlewareCommunicator.register( "Image", "yarp", "ICub", "$cam_world_port", width="$img_width", height="$img_height", rgb="$_rgb", ) @MiddlewareCommunicator.register( "Image", "yarp", "ICub", "$cam_left_port", width="$img_width", height="$img_height", rgb="$_rgb", ) @MiddlewareCommunicator.register( "Image", "yarp", "ICub", "$cam_right_port", width="$img_width", height="$img_height", rgb="$_rgb", ) def receive_images( self, cam_world_port, cam_left_port, cam_right_port, img_width=CAP_PROP_FRAME_WIDTH, img_height=CAP_PROP_FRAME_HEIGHT, _rgb=True, ): """ Receive images from the iCub. :param cam_world_port: str: Port to receive images from the world camera :param cam_left_port: str: Port to receive images from the left camera :param cam_right_port: str: Port to receive images from the right camera :param img_width: int: Width of the image :param img_height: int: Height of the image :param _rgb: bool: Whether the image is RGB or not :return: Images from the iCub """ external_cam, left_cam, right_cam = None, None, None return external_cam, left_cam, right_cam
[docs] def getPeriod(self): """ Get the period of the module. :return: float: Period of the module """ return 0.01
[docs] def updateModule(self): # print(self.getPeriod()) external_cam, left_cam, right_cam = self.receive_images(**self.cam_props) if external_cam is None: external_cam = np.zeros((self.img_height, self.img_width, 1), dtype="uint8") left_cam = np.zeros((self.img_height, self.img_width, 1), dtype="uint8") right_cam = np.zeros((self.img_height, self.img_width, 1), dtype="uint8") else: external_cam = cv2.cvtColor(external_cam, cv2.COLOR_BGR2RGB) left_cam = cv2.cvtColor(left_cam, cv2.COLOR_BGR2RGB) right_cam = cv2.cvtColor(right_cam, cv2.COLOR_BGR2RGB) if not self.headless: cv2.imshow( "ICubCam", np.concatenate((left_cam, external_cam, right_cam), axis=1) ) k = cv2.waitKey(30) else: k = None (switch_emotion,) = self.acquire_facial_expressions( facial_expressions_port=self.FACIAL_EXPRESSIONS_PORT, cv2_key=k, _mware=self.MWARE, ) if switch_emotion is not None and isinstance(switch_emotion, dict): self.update_facial_expressions( switch_emotion.get("emotion_category", None), part=switch_emotion.get("part", False), _mware=self.MWARE, ) # move robot head (move_robot_head,) = self.acquire_head_coordinates( head_coordinates_port=self.HEAD_COORDINATES_PORT, cv2_key=k, _mware=self.MWARE, ) if move_robot_head is not None and isinstance(move_robot_head, dict): robot_head_speed = move_robot_head.get("speed", False) if robot_head_speed and isinstance(robot_head_speed, dict): self.update_head_gaze_speed( pitch=robot_head_speed.get("pitch", 10.0), roll=robot_head_speed.get("roll", 10.0), yaw=robot_head_speed.get("yaw", 20.0), _mware=self.MWARE, ) if move_robot_head.get("reset_gaze", False): self.reset_gaze() self.control_head_gaze( pitch=move_robot_head.get("pitch", 0.0), roll=move_robot_head.get("roll", 0.0), yaw=move_robot_head.get("yaw", 0.0), _mware=self.MWARE, ) # move robot eyes (move_robot_eyes,) = self.acquire_eye_coordinates( eye_coordinates_port=self.EYE_COORDINATES_PORT, cv2_key=k, _mware=self.MWARE ) if move_robot_eyes is not None and isinstance(move_robot_eyes, dict): robot_eye_speed = move_robot_eyes.get("speed", False) if robot_eye_speed and isinstance(robot_eye_speed, dict): self.update_eye_gaze_speed( pitch=robot_eye_speed.get("pitch", 10.0), yaw=robot_eye_speed.get("yaw", 10.0), vergence=robot_eye_speed.get("vergence", 20.0), _mware=self.MWARE, ) if move_robot_eyes.get("reset_gaze", False): self.reset_gaze() self.control_eye_gaze( pitch=move_robot_eyes.get("pitch", 0.0), yaw=move_robot_eyes.get("yaw", 0.0), vergence=move_robot_eyes.get("vergence", 0.0), _mware=self.MWARE, ) if move_robot_head is not None or move_robot_eyes is not None: self._control_head_eye_gaze() (move_robot,) = self.receive_gaze_plane_coordinates( gaze_plane_coordinates_port=self.GAZE_PLANE_COORDINATES_PORT, _mware=self.MWARE, ) if move_robot is not None and isinstance(move_robot, dict): robot_eye_speed = move_robot.get("eye_speed", False) if robot_eye_speed and isinstance(robot_eye_speed, dict): self.update_eye_gaze_speed( **( { "pitch": robot_eye_speed.get("pitch", 10.0), "yaw": robot_eye_speed.get("yaw", 10.0), "vergence": robot_eye_speed.get("vergence", 20.0), "_mware": self.MWARE, } if not self.ikingaze else { "eye": robot_eye_speed.get("eye", 0.5), "_mware": self.MWARE, } ) ) robot_head_speed = move_robot.get("head_speed", False) if robot_head_speed and isinstance(robot_head_speed, dict): self.update_head_gaze_speed( **( { "pitch": robot_head_speed.get("pitch", 10.0), "roll": robot_head_speed.get("roll", 10.0), "yaw": robot_head_speed.get("yaw", 20.0), "_mware": self.MWARE, } if not self.ikingaze else { "head": robot_head_speed.get("head", 0.8), "_mware": self.MWARE, } ) ) if move_robot.get("reset_gaze", False): self.reset_gaze() self.control_gaze_at_plane( x=move_robot.get("x", 0.0), y=move_robot.get("y", 0.0), limit_x=move_robot.get("limit_x", 0.3), limit_y=move_robot.get("limit_y", 0.3), control_head=move_robot.get( "control_head", False if not self.ikingaze else True ), control_eyes=move_robot.get("control_eyes", True), _mware=self.MWARE, ), return True
[docs] def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("--simulation", action="store_true", help="Run in simulation") parser.add_argument("--headless", action="store_true", help="Disable CV2 GUI") parser.add_argument("--ikingaze", action="store_true", help="Enable iKinGazeCtrl") parser.add_argument( "--get_cam_feed", action="store_true", help="Get the camera feeds from the robot", ) parser.add_argument("--control_head", action="store_true", help="Control the head") parser.add_argument( "--set_head_coordinates", action="store_true", help="Publish head coordinates set using keyboard commands", ) parser.add_argument( "--head_coordinates_port", type=str, default="", help="The port (topic) name used for receiving and transmitting head orientation " "Setting the port name without --set_head_coordinates will only receive the coordinates", ) parser.add_argument("--control_eyes", action="store_true", help="Control the eyes") parser.add_argument( "--set_eye_coordinates", action="store_true", help="Publish eye coordinates set using keyboard commands", ) parser.add_argument( "--eye_coordinates_port", type=str, default="", help="The port (topic) name used for receiving and transmitting eye orientation " "Setting the port name without --set_eye_coordinates will only receive the coordinates", ) parser.add_argument( "--gaze_plane_coordinates_port", type=str, default="", help="The port (topic) name used for receiving plane coordinates in 2D for robot to look at", ) parser.add_argument( "--control_expressions", action="store_true", help="Control the facial expressions", ) parser.add_argument( "--set_facial_expressions", action="store_true", help="Publish facial expressions set using keyboard commands", ) parser.add_argument( "--facial_expressions_port", type=str, default="", help="The port (topic) name used for receiving and transmitting facial expressions. " "Setting the port name without --set_facial_expressions will only receive the facial expressions", ) parser.add_argument( "--mware", type=str, default=ICUB_DEFAULT_COMMUNICATOR, help="The middleware used for communication. " "This can be overriden by providing either of the following environment variables " "{WRAPYFI_DEFAULT_COMMUNICATOR, WRAPYFI_DEFAULT_MWARE, " "ICUB_DEFAULT_COMMUNICATOR, ICUB_DEFAULT_MWARE}. Defaults to 'yarp'", choices=MiddlewareCommunicator.get_communicators(), ) return parser.parse_args()
if __name__ == "__main__": args = parse_args() assert not ( args.headless and (args.set_facial_expressions or args.set_head_eye_coordinates) ), "setters require a CV2 window for capturing keystrokes. Disable --set-... for running in headless mode" controller = ICub(**vars(args)) controller.runModule()