Source code for wrapyfi.tests.test_middleware

import unittest
import multiprocessing
from multiprocessing import Queue
import queue


[docs] class ZeroMQTestMiddleware(unittest.TestCase): MWARE = "zeromq"
[docs] def test_publish_listen(self): """ Test the publish and listen functionality of the middleware. This verifies that the middleware can send and receive messages using the PUB/SUB pattern. """ import wrapyfi.tests.tools.class_test as class_test # importlib.reload(wrapyfi) # importlib.reload(class_test) test_func = class_test.test_func Test = class_test.Test Test.close_all_instances() if self.MWARE not in class_test.Test.get_communicators(): self.skipTest(f"{self.MWARE} not installed") listen_queue = Queue(maxsize=10) test_lsn = multiprocessing.Process( target=test_func, args=(listen_queue,), kwargs={ "mode": "listen", "mware": self.MWARE, "iterations": 10, "should_wait": True, }, ) # test_lsn.daemon = True publish_queue = Queue(maxsize=10) test_pub = multiprocessing.Process( target=test_func, args=(publish_queue,), kwargs={ "mode": "publish", "mware": self.MWARE, "iterations": 10, "should_wait": True, }, ) # test_pub.daemon = True test_lsn.start() test_pub.start() test_lsn.join() test_pub.join() for i in range(10): try: self.assertDictEqual( listen_queue.get(timeout=3), publish_queue.get(timeout=3) ) except queue.Empty: self.assertEqual(i, 9)
[docs] class ROS2TestMiddleware(ZeroMQTestMiddleware): """ Test the ROS 2 wrapper. This test class inherits from the ZeroMQ test class, so all tests from the ZeroMQ test class are also run for the ROS 2 wrapper. """ MWARE = "ros2"
[docs] class YarpTestMiddleware(ZeroMQTestMiddleware): """ Test the YARP wrapper. This test class inherits from the ZeroMQ test class, so all tests from the ZeroMQ test class are also run for the YARP wrapper. """ MWARE = "yarp"
[docs] class ROSTestMiddleware(ZeroMQTestMiddleware): """ Test the ROS wrapper. This test class inherits from the ZeroMQ test class, so all tests from the ZeroMQ test class are also run for the ROS wrapper. """ MWARE = "ros"
if __name__ == "__main__": unittest.main()