import logging
import atexit
import threading
import multiprocessing
import time
from collections import defaultdict
import json
from typing import Optional
import zmq
from wrapyfi.utils.core_utils import SingletonOptimized
from wrapyfi.connect.wrapper import MiddlewareCommunicator
ZEROMQ_POST_OPTS = [
"SUBSCRIBE",
"UNSUBSCRIBE",
"LINGER",
"ROUTER_HANDOVER",
"ROUTER_MANDATORY",
"PROBE_ROUTER",
"XPUB_VERBOSE",
"XPUB_VERBOSER",
"REQ_CORRELATE",
"REQ_RELAXED",
"SNDHWM",
"RCVHWM",
]
[docs]
class ZeroMQMiddlewarePubSub(object):
"""
ZeroMQ PUB/SUB middleware wrapper. This class is a singleton, so it can be instantiated only once. The ``activate``
method should be called to initialize the middleware. The ``deinit`` method should be called to deinitialize the
middleware and destroy all connections. The ``activate`` and ``deinit`` methods are automatically called when the
class is instantiated and when the program exits, respectively.
"""
[docs]
class ZeroMQSharedMonitorData:
"""
Shared data class for the ZeroMQ PUB/SUB monitor. This class is used to share data between the main process and
the monitor listener process/thread.
"""
[docs]
def __init__(self, use_multiprocessing: bool = False):
"""
Initialize the shared data class.
:param use_multiprocessing: bool: Whether to use multiprocessing or threading
"""
self.use_multiprocessing = use_multiprocessing
if use_multiprocessing:
self.manager = multiprocessing.Manager()
self.shared_topics = self.manager.list()
self.shared_connections = self.manager.dict()
self.lock = self.manager.Lock()
else:
self.shared_topics = []
self.shared_connections = {}
self.lock = threading.Lock()
[docs]
def add_topic(self, topic: str):
"""
Add a topic to the shared data class.
:param topic: str: The topic to add
"""
with self.lock:
self.shared_topics.append(topic)
[docs]
def remove_topic(self, topic: str):
"""
Remove a topic from the shared data class.
:param topic: str: The topic to remove
"""
try:
with self.lock:
if topic in self.shared_topics:
self.shared_topics.remove(topic)
except (FileNotFoundError, EOFError):
if self.use_multiprocessing:
# TODO(fabawi): this is can break in many ways, and shutting down the topic monitor is not the right solution, since all topics will be affected
self.manager.shutdown()
[docs]
def get_topics(self):
"""
Get the list of topics in the shared data class.
:return: list: The list of topics
"""
with self.lock:
return list(self.shared_topics)
[docs]
def update_connection(self, topic: str, data: dict):
"""
Update the connection data for a topic, e.g. the number of subscribers.
:param topic: str: The topic to update
:param data: dict: The connection data
"""
with self.lock:
self.shared_connections[topic] = data
[docs]
def remove_connection(self, topic: str):
"""
Remove the connection data for a topic.
:param topic: str: The topic to remove
"""
with self.lock:
if topic in list(self.shared_connections.keys()):
del self.shared_connections[topic]
[docs]
def get_connections(self):
"""
Get the connection data for all topics.
:return: dict: The connection data for all topics
"""
with self.lock:
return dict(self.shared_connections)
[docs]
def is_connected(self, topic: str):
"""
Check whether a topic is connected.
:param topic: str: The topic to check
"""
with self.lock:
if topic in list(self.shared_connections.keys()):
return True
else:
return False
[docs]
@staticmethod
def activate(**kwargs):
"""
Activate the ZeroMQ PUB/SUB middleware. This method should be called to initialize the middleware.
:param kwargs: dict: Keyword arguments to be passed to the ZeroMQ initialization function
"""
zeromq_post_kwargs = {}
zeromq_pre_kwargs = {}
for key, value in kwargs.items():
try:
getattr(zmq, key)
if key in ZEROMQ_POST_OPTS:
zeromq_post_kwargs[key] = value
else:
zeromq_pre_kwargs[key] = value
except AttributeError:
pass
ZeroMQMiddlewarePubSub(
zeromq_proxy_kwargs=kwargs,
zeromq_post_kwargs=zeromq_post_kwargs,
**zeromq_pre_kwargs,
)
[docs]
def __init__(
self,
zeromq_proxy_kwargs: Optional[dict] = None,
zeromq_post_kwargs: Optional[dict] = None,
**kwargs,
):
"""
Initialize the ZeroMQ PUB/SUB middleware. This method is automatically called when the class is instantiated.
:param zeromq_proxy_kwargs: Optional[dict]: Keyword arguments to be passed to the ZeroMQ proxy initialization function
:param zeromq_post_kwargs: Optional[dict]: Keyword arguments to be passed to the ZeroMQ initialization function (these are ZeroMQ options)
:param kwargs: dict: Keyword arguments to be passed to the ZeroMQ initialization function
"""
self.zeromq_proxy_kwargs = zeromq_proxy_kwargs or {}
self.zeromq_kwargs = zeromq_post_kwargs or {}
logging.info("Initialising ZeroMQ PUB/SUB middleware")
self.ctx = zmq.Context.instance()
for socket_property in kwargs.items():
if isinstance(socket_property[1], str):
self.ctx.setsockopt_string(
getattr(zmq, socket_property[0]), socket_property[1]
)
else:
self.ctx.setsockopt(
getattr(zmq, socket_property[0]), socket_property[1]
)
atexit.register(MiddlewareCommunicator.close_all_instances)
atexit.register(self.deinit)
# start the pubsub proxy and monitor
if zeromq_proxy_kwargs is not None and zeromq_proxy_kwargs:
# start the pubsub monitor listener
if zeromq_proxy_kwargs.get("start_pubsub_monitor_broker", False):
if zeromq_proxy_kwargs["pubsub_monitor_listener_spawn"] == "process":
self.shared_monitor_data = self.ZeroMQSharedMonitorData(
use_multiprocessing=True
)
self.monitor = multiprocessing.Process(
name="zeromq_pubsub_monitor_listener",
target=self.__init_monitor_listener,
kwargs=zeromq_proxy_kwargs,
)
self.monitor.daemon = True
self.monitor.start()
else: # if threaded
self.shared_monitor_data = self.ZeroMQSharedMonitorData(
use_multiprocessing=False
)
self.monitor = threading.Thread(
name="pubsub_monitor_listener_spawn",
target=self.__init_monitor_listener,
kwargs=zeromq_proxy_kwargs,
)
self.monitor.setDaemon(
True
) # deprecation warning Python3.10. Previous Python versions only support this
self.monitor.start()
time.sleep(1)
if zeromq_proxy_kwargs.get("start_proxy_broker", False):
if zeromq_proxy_kwargs["proxy_broker_spawn"] == "process":
self.proxy = multiprocessing.Process(
name="zeromq_pubsub_broker",
target=self.__init_proxy,
kwargs=zeromq_proxy_kwargs,
)
self.proxy.daemon = True
self.proxy.start()
else: # if threaded
self.proxy = threading.Thread(
name="zeromq_pubsub_broker",
target=self.__init_proxy,
kwargs=zeromq_proxy_kwargs,
)
self.proxy.setDaemon(
True
) # deprecation warning Python3.10. Previous Python versions only support this
self.proxy.start()
pass
[docs]
@staticmethod
def proxy_thread(
socket_pub_address: str = "tcp://127.0.0.1:5555",
socket_sub_address: str = "tcp://127.0.0.1:5556",
inproc_address: str = "inproc://monitor",
):
"""
Proxy thread for the ZeroMQ PUB/SUB proxy.
:param socket_pub_address: str: The address of the PUB socket
:param socket_sub_address: str: The address of the SUB socket
:param inproc_address: str: The address of the inproc socket (connections within the same process, for exchanging subscription data between the proxy and the monitor)
"""
context = zmq.Context.instance()
xpub = context.socket(zmq.XPUB)
xsub = context.socket(zmq.XSUB)
xpub.setsockopt(zmq.XPUB_VERBOSE, 1)
xpub.bind(socket_pub_address)
xsub.bind(socket_sub_address)
monitor = context.socket(zmq.PUB)
monitor.bind(inproc_address)
try:
zmq.proxy(xpub, xsub, monitor)
except Exception as e:
# WORKAROUND(fabawi): The errno in ZMQERROR changes arbitrarily but the message seems consistent. According
# to https://pyzmq.readthedocs.io/en/latest/api/zmq.html#context when ctx.destroy() is called, and the
# context is created in another thread, the operation is unsafe. If we want to maintain daemonized spawning
# of brokers, we have to apply this workaround. There is no way to let the proxy know that the context has
# been destroyed, so we let it crash. This only happens when the spawning method is `thread`; `process`
# (default method) will hang forever. This can be done by setting the environment variables
# WRAPYFI_ZEROMQ_PROXY_BROKER_SPAWN=thread WRAPYFI_ZEROMQ_PUBSUB_MONITOR_LISTENER_SPAWN=thread
if str(e) == "Socket operation on non-socket":
pass
else:
logging.error(
f"[ZeroMQ BROKER] An error occurred in the ZeroMQ proxy: {str(e)}."
)
[docs]
@staticmethod
def subscription_monitor_thread(
inproc_address: str = "inproc://monitor",
socket_sub_address: str = "tcp://127.0.0.1:5556",
pubsub_monitor_topic: str = "ZEROMQ/CONNECTIONS",
verbose: bool = False,
):
"""
Subscription monitor thread for the ZeroMQ PUB/SUB proxy.
:param inproc_address: str: The address of the inproc socket (connections within the same process, for exchanging subscription data between the proxy and the monitor)
:param socket_sub_address: str: The address of the SUB socket
:param pubsub_monitor_topic: str: The topic to use for publishing subscription data
:param verbose: bool: Whether to print debug messages
"""
context = zmq.Context.instance()
subscriber = context.socket(zmq.SUB)
subscriber.connect(inproc_address)
subscriber.setsockopt_string(zmq.SUBSCRIBE, "")
# pub socket to publish subscriber counts.
publisher = context.socket(zmq.PUB)
publisher.connect(socket_sub_address)
topic_subscriber_count = defaultdict(int)
while True:
time.sleep(1)
try:
message = subscriber.recv()
if verbose:
logging.info(f"[ZeroMQ BROKER] Raw message: {message}")
# ensure the message is a subscription/unsubscription message
if len(message) > 1 and (message[0] == 1 or message[0] == 0):
event = message[0]
topic = message[1:].decode("utf-8")
if verbose:
logging.info(
f"[ZeroMQ BROKER] Received event: {event}, topic: {topic}"
)
# avoid processing messages on the monitor topic.
if topic == pubsub_monitor_topic:
continue
# update the count of subscribers for the topic
if event == 1: # subscribe
topic_subscriber_count[topic] += 1
elif event == 0: # unsubscribe
topic_subscriber_count[topic] = 0
if verbose:
logging.info(
f"[ZeroMQ BROKER] Current topic subscriber count: {dict(topic_subscriber_count)}"
)
# publish the updated counts
publisher.send_multipart(
[
pubsub_monitor_topic.encode(),
json.dumps(dict(topic_subscriber_count)).encode(),
]
)
except Exception as e:
logging.error(
f"[ZeroMQ BROKER] An error occurred in the ZeroMQ subscription monitor: {str(e)}"
)
def __init_proxy(
self,
socket_pub_address: str = "tcp://127.0.0.1:5555",
socket_sub_address: str = "tcp://127.0.0.1:5556",
pubsub_monitor_topic: str = "ZEROMQ/CONNECTIONS",
**kwargs,
):
"""
Initialize the ZeroMQ PUB/SUB proxy and subscription monitor.
:param socket_pub_address: str: The address of the PUB socket
:param socket_sub_address: str: The address of the SUB socket
:param pubsub_monitor_topic: str: The topic to use for publishing subscription data
:param kwargs: dict: Keyword arguments to be passed to the ZeroMQ initialization function
"""
inproc_address = "inproc://monitor"
threading.Thread(
target=self.proxy_thread,
kwargs={
"socket_pub_address": socket_pub_address,
"socket_sub_address": socket_sub_address,
"inproc_address": inproc_address,
},
).start(),
threading.Thread(
target=self.subscription_monitor_thread,
kwargs={
"socket_sub_address": socket_sub_address,
"inproc_address": inproc_address,
"pubsub_monitor_topic": pubsub_monitor_topic,
"verbose": kwargs.get("verbose", False),
},
).start()
def __init_monitor_listener(
self,
socket_pub_address: str = "tcp://127.0.0.1:5555",
pubsub_monitor_topic: str = "ZEROMQ/CONNECTIONS",
verbose: bool = False,
**kwargs,
):
"""
Initialize the ZeroMQ PUB/SUB monitor listener.
:param socket_pub_address: str: The address of the PUB socket
:param pubsub_monitor_topic: str: The topic to use for publishing subscription data
:param verbose: bool: Whether to print debug messages
"""
try:
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.connect(socket_pub_address)
subscriber.setsockopt_string(zmq.SUBSCRIBE, pubsub_monitor_topic)
while True:
_, message = subscriber.recv_multipart()
data = json.loads(message.decode("utf-8"))
for topic, value in data.items():
if verbose:
logging.info(f"[ZeroMQ] Topic: {topic}, Data: {value}")
# check if the topic exists in shared monitor data
if topic in self.shared_monitor_data.get_topics():
self.shared_monitor_data.update_connection(topic, value)
if value == 0:
logging.info(
f"[ZeroMQ] Subscriber disconnected from topic: {topic}"
)
self.shared_monitor_data.remove_connection(topic)
else:
logging.info(
f"[ZeroMQ] Subscriber connected to topic: {topic}"
)
if verbose:
for monitored_topic in self.shared_monitor_data.get_topics():
logging.info(
f"[ZeroMQ] Monitored topic from main process: {monitored_topic}"
)
except Exception as e:
logging.error(
f"[ZeroMQ] An error occurred in the ZeroMQ subscription monitor listener: {str(e)}"
)
[docs]
@staticmethod
def deinit():
logging.info("Deinitializing ZeroMQ middleware")
zmq.Context.instance().destroy()
[docs]
class ZeroMQMiddlewarePubSubListen(
ZeroMQMiddlewarePubSub, metaclass=SingletonOptimized
):
[docs]
@staticmethod
def activate(**kwargs):
"""
Activate the ZeroMQ PUB/SUB middleware. This method should be called to initialize the middleware.
:param kwargs: dict: Keyword arguments to be passed to the ZeroMQ initialization function
"""
zeromq_post_kwargs = {}
zeromq_pre_kwargs = {}
for key, value in kwargs.items():
try:
getattr(zmq, key)
if key in ZEROMQ_POST_OPTS:
zeromq_post_kwargs[key] = value
else:
zeromq_pre_kwargs[key] = value
except AttributeError:
pass
ZeroMQMiddlewarePubSubListen(
zeromq_proxy_kwargs=kwargs,
zeromq_post_kwargs=zeromq_post_kwargs,
**zeromq_pre_kwargs,
)
[docs]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
[docs]
class ZeroMQMiddlewarePubSubPublish(
ZeroMQMiddlewarePubSub, metaclass=SingletonOptimized
):
[docs]
@staticmethod
def activate(**kwargs):
"""
Activate the ZeroMQ PUB/SUB middleware. This method should be called to initialize the middleware.
:param kwargs: dict: Keyword arguments to be passed to the ZeroMQ initialization function
"""
zeromq_post_kwargs = {}
zeromq_pre_kwargs = {}
for key, value in kwargs.items():
try:
getattr(zmq, key)
if key in ZEROMQ_POST_OPTS:
zeromq_post_kwargs[key] = value
else:
zeromq_pre_kwargs[key] = value
except AttributeError:
pass
ZeroMQMiddlewarePubSubPublish(
zeromq_proxy_kwargs=kwargs,
zeromq_post_kwargs=zeromq_post_kwargs,
**zeromq_pre_kwargs,
)
[docs]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
[docs]
class ZeroMQMiddlewareReqRep(metaclass=SingletonOptimized):
"""
ZeroMQ REQ/REP middleware wrapper. This class is a singleton, so it can be instantiated only once. The ``activate``
method should be called to initialize the middleware. The ``deinit`` method should be called to deinitialize the
middleware and destroy all connections. The ``activate`` and ``deinit`` methods are automatically called when the
class is instantiated and when the program exits, respectively.
"""
[docs]
@staticmethod
def activate(**kwargs):
"""
Activate the ZeroMQ REQ/REP middleware. This method should be called to initialize the middleware.
:param kwargs: dict: Keyword arguments to be passed to the ZeroMQ initialization function
"""
zeromq_post_kwargs = {}
zeromq_pre_kwargs = {}
for key, value in kwargs.items():
try:
getattr(zmq, key)
if key in ZEROMQ_POST_OPTS:
zeromq_post_kwargs[key] = value
else:
zeromq_pre_kwargs[key] = value
except AttributeError:
pass
ZeroMQMiddlewareReqRep(
zeromq_proxy_kwargs=kwargs,
zeromq_post_kwargs=zeromq_post_kwargs,
**zeromq_pre_kwargs,
)
[docs]
def __init__(
self,
zeromq_proxy_kwargs: Optional[dict] = None,
zeromq_post_kwargs: Optional[dict] = None,
*args,
**kwargs,
):
"""
Initialize the ZeroMQ REQ/REP middleware. This method is automatically called when the class is instantiated.
:param zeromq_proxy_kwargs: Optional[dict]: Keyword arguments to be passed to the ZeroMQ proxy initialization function
:param zeromq_post_kwargs: Optional[dict]: Keyword arguments to be passed to the ZeroMQ initialization function (these are ZeroMQ options)
:param args: list: Positional arguments to be passed to the ZeroMQ initialization function
:param kwargs: dict: Keyword arguments to be passed to the ZeroMQ initialization function
"""
self.zeromq_proxy_kwargs = zeromq_proxy_kwargs or {}
self.zeromq_kwargs = zeromq_post_kwargs or {}
logging.info("Initialising ZeroMQ REQ/REP middleware")
self.ctx = zmq.Context.instance()
for socket_property in kwargs.items():
if isinstance(socket_property[1], str):
self.ctx.setsockopt_string(
getattr(zmq, socket_property[0]), socket_property[1]
)
else:
self.ctx.setsockopt(
getattr(zmq, socket_property[0]), socket_property[1]
)
atexit.register(MiddlewareCommunicator.close_all_instances)
atexit.register(self.deinit)
if zeromq_proxy_kwargs is not None and zeromq_proxy_kwargs:
if zeromq_proxy_kwargs["proxy_broker_spawn"] == "process":
self.proxy = multiprocessing.Process(
name="zeromq_reqrep_broker",
target=self.__init_device,
kwargs=zeromq_proxy_kwargs,
)
self.proxy.daemon = True
self.proxy.start()
else: # if threaded
self.proxy = threading.Thread(
name="zeromq_reqrep_broker",
target=self.__init_device,
kwargs=zeromq_proxy_kwargs,
)
self.proxy.setDaemon(
True
) # deprecation warning Python3.10. Previous Python versions only support this
self.proxy.start()
pass
@staticmethod
def __init_device(
socket_rep_address: str = "tcp://127.0.0.1:5559",
socket_req_address: str = "tcp://127.0.0.1:5560",
**kwargs,
):
"""
Initialize the ZeroMQ REQ/REP device broker.
:param socket_rep_address: str: The address of the REP socket
:param socket_req_address: str: The address of the REQ socket
"""
xrep = zmq.Context.instance().socket(zmq.XREP)
try:
xrep.bind(socket_rep_address)
except zmq.ZMQError as e:
logging.error(f"[ZeroMQ] {e} {socket_rep_address}")
return
xreq = zmq.Context.instance().socket(zmq.XREQ)
try:
xreq.bind(socket_req_address)
except zmq.ZMQError as e:
logging.error(f"[ZeroMQ] {e} {socket_req_address}")
return
# logging.info(f"[ZeroMQ] Intialising REQ/REP device broker")
try:
zmq.proxy(xrep, xreq)
except Exception as e:
# WORKAROUND(fabawi): The errno in ZMQERROR changes arbitrarily but the message seems consistent. According
# to https://pyzmq.readthedocs.io/en/latest/api/zmq.html#context when ctx.destroy() is called, and the
# context is created in another thread, the operation is unsafe. If we want to maintain daemonized spawning
# of brokers, we have to apply this workaround. There is no way to let the proxy know that the context has
# been destroyed, so we let it crash. This only happens when the spawning method is `thread`; `process`
# (default method) will hang forever. This can be done by setting the environment variables
# WRAPYFI_ZEROMQ_PROXY_BROKER_SPAWN=thread WRAPYFI_ZEROMQ_PUBSUB_MONITOR_LISTENER_SPAWN=thread
if str(e) == "Socket operation on non-socket":
pass
else:
logging.error(
f"[ZeroMQ] An error occurred in the ZeroMQ proxy: {str(e)}."
)
[docs]
@staticmethod
def deinit():
logging.info("Deinitializing ZeroMQ middleware")
zmq.Context.instance().destroy()
[docs]
class ZeroMQMiddlewareParamServer(metaclass=SingletonOptimized):
"""
ZeroMQ parameter server middleware wrapper. This class is a singleton, so it can be instantiated only once. The
``activate`` method should be called to initialize the middleware. The ``deinit`` method should be called to
deinitialize the middleware and destroy all connections. The ``activate`` and ``deinit`` methods are automatically
called when the class is instantiated and when the program exits, respectively.
Note: This parameter server is experimental and not fully tested.
"""
[docs]
@staticmethod
def activate(**kwargs):
zeromq_post_kwargs = {}
zeromq_pre_kwargs = {}
for key, value in kwargs.items():
try:
getattr(zmq, key)
if key in ZEROMQ_POST_OPTS:
zeromq_post_kwargs[key] = value
else:
zeromq_pre_kwargs[key] = value
except AttributeError:
pass
ZeroMQMiddlewareParamServer(
zeromq_proxy_kwargs=kwargs,
zeromq_post_kwargs=zeromq_post_kwargs,
**zeromq_pre_kwargs,
)
[docs]
def __init__(
self,
zeromq_proxy_kwargs: Optional[dict] = None,
zeromq_post_kwargs: Optional = None,
*args,
**kwargs,
):
"""
Initialize the ZeroMQ parameter server middleware. This method is automatically called when the class is
instantiated.
:param zeromq_proxy_kwargs: Optional[dict]: Keyword arguments to be passed to the ZeroMQ proxy initialization function
:param zeromq_post_kwargs: Optional[dict]: Keyword arguments to be passed to the ZeroMQ initialization function (these are ZeroMQ options)
:param kwargs: dict: Keyword arguments to be passed to the ZeroMQ initialization function
"""
self.zeromq_proxy_kwargs = zeromq_proxy_kwargs or {}
self.zeromq_kwargs = zeromq_post_kwargs or {}
logging.info("Initialising ZeroMQ Parameter Server")
self.ctx = zmq.Context.instance()
for socket_property in kwargs.items():
if isinstance(socket_property[1], str):
self.ctx.setsockopt_string(
getattr(zmq, socket_property[0]), socket_property[1]
)
else:
self.ctx.setsockopt(
getattr(zmq, socket_property[0]), socket_property[1]
)
atexit.register(MiddlewareCommunicator.close_all_instances)
atexit.register(self.deinit)
if zeromq_proxy_kwargs is not None and zeromq_proxy_kwargs:
self.manager = multiprocessing.Manager()
self.params = self.manager.dict()
self.params["WRAPYFI_ACTIVE"] = "True"
if zeromq_proxy_kwargs["proxy_broker_spawn"] == "process":
self.param_broadcaster = multiprocessing.Process(
name="zeromq_param_broadcaster",
target=self.__init_broadcaster,
kwargs=zeromq_proxy_kwargs,
args=(self.params,),
)
self.param_broadcaster.daemon = True
self.param_broadcaster.start()
self.param_server = multiprocessing.Process(
name="zeromq_param_server",
target=self.__init_server,
kwargs=zeromq_proxy_kwargs,
args=(self.params,),
)
self.param_server.daemon = True
self.param_server.start()
else: # if threaded
self.param_broadcaster = threading.Thread(
name="zeromq_param_broadcaster",
target=self.__init_broadcaster,
kwargs=zeromq_proxy_kwargs,
args=(self.params,),
)
self.param_broadcaster.setDaemon(
True
) # deprecation warning Python3.10. Previous Python versions only support this
self.param_broadcaster.start()
self.param_server = threading.Thread(
name="zeromq_param_server",
target=self.__init_server,
kwargs=zeromq_proxy_kwargs,
args=(self.params,),
)
self.param_server.setDaemon(
True
) # deprecation warning Python3.10. Previous Python versions only support this
self.param_server.start()
pass
@staticmethod
def __init_broadcaster(
params,
param_pub_address: str = "tcp://127.0.0.1:5655",
param_sub_address: str = "tcp://127.0.0.1:5656",
param_poll_interval=1,
verbose=False,
**kwargs,
):
"""
Initialize the ZeroMQ parameter server broadcaster.
:param params: dict: The parameters to be broadcasted
:param param_pub_address: str: The address of the PUB socket
:param param_sub_address: str: The address of the SUB socket
:param param_poll_interval: int: The polling interval for the parameter server
:param verbose: bool: Whether to print debug messages
"""
update_trigger = False
cached_params = {}
root_topics = set()
ctx = zmq.Context.instance()
# create XPUB
xpub_socket = ctx.socket(zmq.XPUB)
xpub_socket.bind(param_pub_address)
# create XSUB
xsub_socket = ctx.socket(zmq.XSUB)
xsub_socket.bind(param_sub_address)
# connect a PUB socket to send parameters
param_server = ctx.socket(zmq.PUB)
param_server.connect(param_sub_address)
poller = zmq.Poller()
poller.register(xpub_socket, zmq.POLLIN)
poller.register(xsub_socket, zmq.POLLIN)
if verbose:
logging.info(f"[ZeroMQ] Intialising PUB/SUB device broker")
while True:
# get event
event = dict(poller.poll(param_poll_interval))
if xpub_socket in event:
message = xpub_socket.recv_multipart()
if verbose:
logging.info(
"[ZeroMQ BROKER] xpub_socket recv message: %r" % message
)
if message[0].startswith(b"\x00"):
root_topics.remove(message[0][1:].decode("utf-8"))
elif message[0].startswith(b"\x01"):
root_topics.add(message[0][1:].decode("utf-8"))
xsub_socket.send_multipart(message)
if xsub_socket in event:
message = xsub_socket.recv_multipart()
if verbose:
logging.info(
"[ZeroMQ BROKER] xsub_socket recv message: %r" % message
)
if message[0].startswith(b"\x01") or message[0].startswith(b"\x00"):
xpub_socket.send_multipart(message)
else:
fltr_key = message[0].decode("utf-8")
fltr_message = {
key: val
for key, val in params.items()
if key.startswith(fltr_key)
}
if verbose:
logging.info(
"[ZeroMQ BROKER] xsub_socket filtered message: %r"
% fltr_message
)
for key, val in fltr_message.items():
prefix, param = key.rsplit("/", 1) if "/" in key else ("", key)
xpub_socket.send_multipart(
[
prefix.encode("utf-8"),
param.encode("utf-8"),
val.encode("utf-8"),
]
)
# xpub_socket.send_multipart(message)
if event:
update_trigger = True
if param_server is not None:
update_trigger, cached_params = (
ZeroMQMiddlewareParamServer.publish_params(
param_server, params, cached_params, root_topics, update_trigger
)
)
[docs]
@staticmethod
def publish_params(
param_server,
params: dict,
cached_params: dict,
root_topics: set,
update_trigger: bool,
):
"""
Publish parameters to the parameter server.
:param param_server: zmq.Socket: The parameter server socket
:param params: dict: The parameters to be published
:param cached_params: dict: The cached parameters. This is used to check whether the parameters have changed
:param root_topics: set: The root topics. This is used to check whether there are any active subscribers
:param update_trigger: bool: Whether to trigger an update of the parameters
:return: Tuple[bool, dict]: Whether to trigger an update of the parameters and the cached parameters
"""
# check if there are any subscribed clients before publishing updates
if not root_topics:
return None, None
if not any((update_trigger, root_topics)) and cached_params == params:
return None, None
else:
time.sleep(0.01)
update_trigger = False
cached_params = params.copy()
# publish updates for all parameters to subscribed clients
for key, val in params.items():
prefix, param = key.rsplit("/", 1) if "/" in key else ("", key)
param_server.send_multipart(
[prefix.encode("utf-8"), param.encode("utf-8"), val.encode("utf-8")]
)
return update_trigger, cached_params
@staticmethod
def __init_server(
params: dict, param_reqrep_address: str = "tcp://127.0.0.1:5659", **kwargs
):
"""
Initialize the ZeroMQ parameter server.
:param params: dict: The parameters to be published
:param param_reqrep_address: str: The address of the REQ/REP socket
"""
ctx = zmq.Context.instance()
request_server = ctx.socket(zmq.REP)
request_server.bind(param_reqrep_address)
while True:
request = request_server.recv_string()
if request.startswith("get"):
try:
# extract the parameter name and namespace prefix from the request
prefix, param = (
request[4:].rsplit("/", 1)
if "/" in request[4:]
else ("", request[4:])
)
# construct the full parameter name with the namespace prefix
full_param = "/".join([prefix, param]) if prefix else param
if full_param in params:
request_server.send_string(str(params[full_param]))
else:
request_server.send_string("error:::parameter does not exist")
except ValueError:
request_server.send_string("error:::malformed request")
elif request.startswith("read"):
try:
# extract the parameter name and namespace prefix from the request
prefix = request[5:]
# construct the full parameter name with the namespace prefix
if any(param.startswith(prefix) for param in params.keys()):
request_server.send_string(f"success:::{prefix}")
else:
request_server.send_string("error:::parameter does not exist")
except ValueError:
request_server.send_string("error:::malformed request")
elif request.startswith("set"):
try:
# extract the parameter name, namespace prefix and value from the request
prefix, param, value = request[4:].rsplit("/", 2)
# construct the full parameter name with the namespace prefix
full_param = "/".join([prefix, param]) if prefix else param
params[full_param] = value
request_server.send_string(f"success:::{prefix}")
except ValueError:
request_server.send_string("error:::malformed request")
elif request.startswith("delete"):
try:
# extract the parameter name and namespace prefix from the request
prefix, param = request[7:].rsplit("/", 1)
# construct the full parameter name with the namespace prefix
full_param = "/".join([prefix, param]) if prefix else param
if full_param in params:
del params[full_param]
request_server.send_string(f"success:::{prefix}")
else:
request_server.send_string("error:::parameter does not exist")
except ValueError:
request_server.send_string("error:::malformed request")
else:
request_server.send_string("error:::invalid request")
[docs]
@staticmethod
def deinit():
"""
Deinitialize the ZeroMQ parameter server.
"""
logging.info("Deinitializing ZeroMQ Parameter Server")
zmq.Context.instance().destroy()