Source code for examples.custom_msgs.ros_message_example

"""
A message publisher and listener for ROS messages using Wrapyfi.

This script demonstrates the capability to transmit ROS messages, specifically
geometry_msgs/Pose and std_msgs/String, using the MiddlewareCommunicator within
the Wrapyfi library. The communication follows the PUB/SUB pattern allowing
message publishing and listening functionalities between processes or machines.

Demonstrations:
    - Using the ROS message
    - Transmitting std_msgs/String and geometry_msgs/Pose ROS messages
    - Applying the PUB/SUB pattern with channeling and mirroring

Requirements:
    - Wrapyfi: Middleware communication wrapper (refer to the Wrapyfi documentation for installation instructions)
    - ROS, ROSPy: Used for handling and creating ROS messages (refer to the Wrapyfi documentation for installation instructions)

    Ensure ROS is installed and the required message types are available.

Run:
    # On machine 1 (or process 1): Publisher waits for keyboard input and transmits message
        ``python3 ros_message_example.py --mode publish``

    # On machine 2 (or process 2): Listener waits for message and prints the received ROS messages
        ``python3 ros_message_example.py --mode listen``
"""

import argparse

from geometry_msgs.msg import Pose, Quaternion
from std_msgs.msg import String

from wrapyfi.connect.wrapper import MiddlewareCommunicator


[docs] class Notifier(MiddlewareCommunicator):
[docs] @MiddlewareCommunicator.register( "ROSMessage", "ros", "Notifier", "/notify/test_ros_msg_str_exchange", should_wait=True, ) @MiddlewareCommunicator.register( "ROSMessage", "ros", "Notifier", "/notify/test_ros_msg_pose_exchange", should_wait=True, ) def send_message(self): """ Exchange ROS messages over ROS. """ msg = input("Type your message: ") quat = Quaternion() quat.x = 0.1 quat.y = -0.1 quat.z = 0.8 quat.z = -0.8 ret_str = String(msg) ret_pose = Pose() ret_pose.orientation = quat return ret_str, ret_pose
[docs] def parse_args(): """ Parse command line arguments. """ parser = argparse.ArgumentParser( description="A message publisher and listener for ROS messages using Wrapyfi." ) parser.add_argument( "--mode", type=str, default="publish", choices={"publish", "listen"}, help="The transmission mode", ) return parser.parse_args()
[docs] def main(args): """ Main function to initiate Notify class and communication. """ ros_message = Notifier() ros_message.activate_communication(Notifier.send_message, mode=args.mode) while True: my_string_message, my_pose_message = ros_message.send_message() if my_string_message is not None or my_pose_message is not None: print("Method result:", my_string_message, my_pose_message)
if __name__ == "__main__": args = parse_args() main(args)