Source code for wrapyfi.standalone.zeromq_proxy_broker

import argparse
import logging

logging.getLogger().setLevel(logging.INFO)

import zmq

from wrapyfi.middlewares.zeromq import ZeroMQMiddlewareReqRep, ZeroMQMiddlewarePubSub


[docs] def main( comm_type, socket_ip, socket_pub_port, socket_sub_port, socket_rep_port, socket_req_port, **kwargs, ): if comm_type == "pubsub": socket_pub_address = f"tcp://{socket_ip}:{socket_pub_port}" socket_sub_address = f"tcp://{socket_ip}:{socket_sub_port}" ZeroMQMiddlewarePubSub.activate( **{ "socket_pub_address": socket_pub_address, "socket_sub_address": socket_sub_address, "proxy_broker_spawn": "process", "verbose": True, }, **kwargs, ) while True: pass elif comm_type == "reqrep": socket_rep_address = f"tcp://{socket_ip}:{socket_rep_port}" socket_req_address = f"tcp://{socket_ip}:{socket_req_port}" ZeroMQMiddlewareReqRep.activate( **{ "socket_rep_address": socket_rep_address, "socket_req_address": socket_req_address, "proxy_broker_spawn": "process", "verbose": True, }, **kwargs, ) while True: pass elif comm_type == "pubsubpoll": # debugging the xpub/xsub with a poller xpub_addr = f"tcp://{socket_ip}:{socket_pub_port}" xsub_addr = f"tcp://{socket_ip}:{socket_sub_port}" context = zmq.Context() xpub_socket = context.socket(zmq.XPUB) xpub_socket.bind(xpub_addr) xsub_socket = context.socket(zmq.XSUB) xsub_socket.bind(xsub_addr) poller = zmq.Poller() poller.register(xpub_socket, zmq.POLLIN) poller.register(xsub_socket, zmq.POLLIN) logging.info(f"[ZeroMQ] Intialising PUB/SUB device broker") while True: # get event event = dict(poller.poll(1000)) if xpub_socket in event: message = xpub_socket.recv_multipart() # print("[ZeroMQ BROKER] xpub_socket recv message: %r" % message) xsub_socket.send_multipart(message) if xsub_socket in event: message = xsub_socket.recv_multipart() # print("[ZeroMQ BROKER] xsub_socket recv message: %r" % message) xpub_socket.send_multipart(message)
[docs] def parse_args(): parser = argparse.ArgumentParser() parser.add_argument( "--socket_ip", type=str, default="127.0.0.1", help="Socket IP address" ) parser.add_argument( "--socket_pub_port", type=int, default=5555, help="Socket publishing port" ) parser.add_argument( "--socket_sub_port", type=int, default=5556, help="Socket subscription port" ) parser.add_argument( "--socket_rep_port", type=int, default=5559, help="Socket reply port" ) parser.add_argument( "--socket_req_port", type=int, default=5560, help="Socket request port" ) parser.add_argument( "--comm_type", type=str, default="pubsub", choices=["pubsub", "pubsubpoll", "reqrep"], help="The zeromq communication pattern", ) return parser.parse_args()
if __name__ == "__main__": args = parse_args() main(**vars(args))