Source code for examples.applications.affective_signaling_multirobot

import argparse

import cv2

from wrapyfi.connect.wrapper import MiddlewareCommunicator, DEFAULT_COMMUNICATOR
from wrapyfi.config.manager import ConfigManager


[docs] class ExperimentController(MiddlewareCommunicator): WEBCAM_HEIGHT = 240 WEBCAM_WIDTH = 320 PEPPER_CAM_HEIGHT = 480 PEPPER_CAM_WIDTH = 640 ICUB_CAM_HEIGHT = 240 ICUB_CAM_WIDTH = 320 ESR_CAM_HEIGHT = 240 ESR_CAM_WIDTH = 320
[docs] def __init__(self, **kwargs): super(ExperimentController, self).__init__()
[docs] @MiddlewareCommunicator.register( "NativeObject", "$_mware", "ExperimentController", "/control_interface/facial_expressions_esr9", should_wait=False, ) def listen_facial_expressions_esr9(self, _mware=DEFAULT_COMMUNICATOR): return (None,)
[docs] @MiddlewareCommunicator.register( "NativeObject", "$_mware", "ExperimentController", "/control_interface/facial_expressions_icub", should_wait=False, ) def publish_facial_expressions_icub(self, obj, _mware="yarp"): return (obj,)
[docs] @MiddlewareCommunicator.register( "NativeObject", "$_mware", "ExperimentController", "/control_interface/facial_expressions_pepper", carrier="tcp", should_wait=False, ) def publish_facial_expressions_pepper(self, obj, _mware="ros"): return (obj,)
[docs] @MiddlewareCommunicator.register( "Image", "$_mware", "ExperimentController", "$_topic", width="$_width", height="$_height", rgb=True, fp=False, should_wait=False, jpg=True, ) def listen_image_webcam( self, _mware=DEFAULT_COMMUNICATOR, _width=WEBCAM_WIDTH, _height=WEBCAM_HEIGHT, _topic="/control_interface/image_webcam", ): # dynamic topic according to corresponding API return (None,)
[docs] @MiddlewareCommunicator.register( "Image", "$_mware", "ExperimentController", "$_topic", width="$_width", height="$_height", rgb=True, fp=False, should_wait=False, ) def listen_image_pepper_cam( self, _mware="ros", _width=PEPPER_CAM_WIDTH, _height=PEPPER_CAM_HEIGHT, _topic="/pepper/camera/front/camera/image_raw", ): return (None,)
[docs] @MiddlewareCommunicator.register( "Image", "$_mware", "ExperimentController", "$_topic", width="$_width", height="$_height", rgb=True, fp=False, should_wait=False, ) def listen_image_icub_cam( self, _mware="yarp", _width=ICUB_CAM_WIDTH, _height=ICUB_CAM_HEIGHT, _topic="/icub/cam/right", ): return (None,)
[docs] @MiddlewareCommunicator.register( "Image", "$_mware", "ExperimentController", "/control_interface/image_webcam", width="$_width", height="$_height", rgb=True, fp=False, should_wait=False, ) def forward_image_webcam( self, img, _mware=DEFAULT_COMMUNICATOR, _width=WEBCAM_WIDTH, _height=WEBCAM_HEIGHT, ): return (img,)
[docs] @MiddlewareCommunicator.register( "Image", "$_mware", "ExperimentController", "/control_interface/image_pepper_cam", width="$_width", height="$_height", rgb=True, fp=False, jpg=True, should_wait=False, ) def forward_image_pepper_cam( self, img, _mware=DEFAULT_COMMUNICATOR, _width=PEPPER_CAM_WIDTH, _height=PEPPER_CAM_HEIGHT, ): return (img,)
[docs] @MiddlewareCommunicator.register( "Image", "$_mware", "ExperimentController", "/control_interface/image_icub_cam", width="$_width", height="$_height", rgb=True, fp=False, jpg=True, should_wait=False, ) def forward_image_icub_cam( self, img, _mware=DEFAULT_COMMUNICATOR, _width=ICUB_CAM_WIDTH, _height=ICUB_CAM_HEIGHT, ): # convert to bgr if img is not None: img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) return (img,)
[docs] @MiddlewareCommunicator.register( "Image", "$_mware", "ExperimentController", "/control_interface/image_esr9", width="$_width", height="$_height", rgb=True, fp=False, should_wait=False, jpg=True, ) def publish_esr9_cam( self, img, _mware=DEFAULT_COMMUNICATOR, _width=ESR_CAM_WIDTH, _height=ESR_CAM_HEIGHT, ): # modify size of image if img is not None: img = cv2.resize(img, (self.ESR_CAM_WIDTH, self.ESR_CAM_HEIGHT)) return (img,)
[docs] def parse_args(): parser = argparse.ArgumentParser() parser.add_argument( "--wrapyfi_cfg", help="File to load Wrapyfi configs for running instance. " "Choose one of the configs available in " "./wrapyfi_configs/affective_signaling_multirobot " "for each running instance. All configs with a prefix of COMP must " "have corresponding instances running. OPT prefixed configs execute " "scripts on a machine connected to either robot (Pepper/iCub) or both, " "and at least one must run in addition to COMP.", type=str, ) parser.add_argument( "--cam_source", help="The camera input source being either from a " "webcam, Pepper, or iCub. Note that this must be similar for all running " "instances even when the two robots are set to display the expressions.", type=str, default="webcam", choices=["webcam", "pepper", "icub"], ) return parser.parse_args()
if __name__ == "__main__": args = parse_args() if args.wrapyfi_cfg: print("Wrapyfi config loading from ", args.wrapyfi_cfg) ConfigManager(args.wrapyfi_cfg) ec = ExperimentController() image_cam = None while True: if args.cam_source == "webcam": (image_webcam,) = ec.listen_image_webcam() (image_webcam_linked,) = ec.forward_image_webcam(image_webcam) if image_webcam is None: image_webcam = image_webcam_linked image_cam = image_webcam if image_webcam is not None: cv2.imshow("Webcam image", image_webcam) cv2.waitKey(1) if args.cam_source == "pepper": (image_pepper_cam,) = ec.listen_image_pepper_cam() (image_pepper_cam_linked,) = ec.forward_image_pepper_cam(image_pepper_cam) if image_pepper_cam is None: image_pepper_cam = image_pepper_cam_linked image_cam = image_pepper_cam if image_pepper_cam is not None: cv2.imshow("Pepper image", image_pepper_cam) cv2.waitKey(1) if args.cam_source == "icub": (image_icub_cam,) = ec.listen_image_icub_cam() (image_icub_cam_linked,) = ec.forward_image_icub_cam(image_icub_cam) if image_icub_cam is None: image_icub_cam = image_icub_cam_linked image_cam = image_icub_cam if image_icub_cam_linked is not None: cv2.imshow("iCub image", image_icub_cam_linked) cv2.waitKey(1) (image_esr,) = ec.publish_esr9_cam(image_cam) (facial_expression,) = ec.listen_facial_expressions_esr9() if facial_expression is not None: ec.publish_facial_expressions_icub(facial_expression) ec.publish_facial_expressions_pepper(facial_expression)