Source code for examples.encoders.zarr_example

"""
A message publisher and listener for native Python objects and Zarr arrays/groups.

This script demonstrates the capability to transmit native Python objects and Zarr arrays/groups using
the MiddlewareCommunicator within the Wrapyfi library. The communication follows the PUB/SUB pattern
allowing message publishing and listening functionalities between processes or machines.

Demonstrations:
    - Using the NativeObject message
    - Transmitting a nested dummy Python object with native objects and Zarr arrays/groups
    - Applying the PUB/SUB pattern with mirroring

Requirements:
    - Wrapyfi: Middleware communication wrapper (refer to the Wrapyfi documentation for installation instructions)
    - YARP, ROS, ROS 2, ZeroMQ (refer to the Wrapyfi documentation for installation instructions)
    - NumPy: Used for creating arrays (installed with Wrapyfi)
    - zarr: Used for creating and handling Zarr arrays and groups

    Install using pip:
        ``pip install zarr``

Run:
    # On machine 1 (or process 1): Publisher waits for keyboard input and transmits message
        ``python3 zarr_example.py --mode publish``

    # On machine 2 (or process 2): Listener waits for message and prints the entire dummy object
        ``python3 zarr_example.py --mode listen``
"""

import argparse

try:
    import numpy as np
    import zarr
except ImportError:
    print("Install zarr and NumPy before running this script.")

from wrapyfi.connect.wrapper import MiddlewareCommunicator, DEFAULT_COMMUNICATOR


[docs] class Notifier(MiddlewareCommunicator):
[docs] @MiddlewareCommunicator.register( "NativeObject", "$mware", "Notifier", "/notify/test_zarr_exchange", carrier="tcp", should_wait=True, ) def exchange_object(self, mware=None): """ Exchange messages with Zarr arrays/groups and other native Python objects. """ msg = input("Type your message: ") # Creating an example Zarr Array zarray = zarr.array(np.random.random((10, 10)), chunks=(5, 5)) # Creating an example Zarr Group zgroup = zarr.group() zgroup.create_dataset("dataset1", data=np.random.randint(0, 100, 50), chunks=10) zgroup.create_dataset("dataset2", data=np.random.random(100), chunks=10) ret = { "message": msg, "zarr_array": zarray, "zarr_group": zgroup, } return (ret,)
[docs] def parse_args(): """ Parse command line arguments. """ parser = argparse.ArgumentParser( description="A message publisher and listener for native Python objects and Zarr arrays/groups." ) parser.add_argument( "--mode", type=str, default="publish", choices={"publish", "listen"}, help="The transmission mode", ) parser.add_argument( "--mware", type=str, default=DEFAULT_COMMUNICATOR, choices=MiddlewareCommunicator.get_communicators(), help="The middleware to use for transmission", ) return parser.parse_args()
[docs] def main(args): """ Main function to initiate Notifier class and communication. """ notifier = Notifier() notifier.activate_communication(Notifier.exchange_object, mode=args.mode) while True: (msg_object,) = notifier.exchange_object(mware=args.mware) print("Method result:", msg_object)
if __name__ == "__main__": args = parse_args() main(args)