import logging
import atexit
import threading
import socketio
from wrapyfi.utils.core_utils import SingletonOptimized
from wrapyfi.connect.wrapper import MiddlewareCommunicator
[docs]
class WebSocketMiddlewarePubSub(metaclass=SingletonOptimized):
"""
WebSocket middleware wrapper. This class is a singleton, so it can be instantiated only once.
The `activate` method should be called to initialize the middleware.
The `deinit` method should be called to deinitialize the middleware and destroy all connections.
"""
_instance = None
[docs]
@staticmethod
def activate(socket_address: str = "http://127.0.0.1:5000", **kwargs):
"""
Activate the WebSocket middleware. Initializes the WebSocket client with the provided address and options.
:param socket_address: str: The WebSocket server address
:param kwargs: dict: Additional keyword arguments for the middleware
"""
if WebSocketMiddlewarePubSub._instance is None:
WebSocketMiddlewarePubSub._instance = WebSocketMiddlewarePubSub(
socket_address=socket_address, **kwargs
)
return WebSocketMiddlewarePubSub._instance
[docs]
def __init__(
self,
socket_address: str = "http://127.0.0.1:5000",
websocket_kwargs: dict = None,
**kwargs,
):
"""
Initialize the WebSocket middleware. This method is automatically called when the class is instantiated.
:param socket_address: str: The WebSocket server address
:param websocket_kwargs: dict: Additional keyword arguments for the WebSocket connection
:param kwargs: dict: Additional keyword arguments for compatibility with the interface
"""
logging.info(f"Initializing WebSocket middleware on {socket_address}")
self.socket_address = socket_address
self.websocket_kwargs = websocket_kwargs or {}
self.socketio_client = socketio.Client()
self.connected = False
@self.socketio_client.event
def connect():
logging.info(
f"[WebSocketMiddlewarePubSub] Connected to {self.socket_address}"
)
self.connected = True
@self.socketio_client.event
def disconnect():
logging.info(
f"[WebSocketMiddlewarePubSub] Disconnected from {self.socket_address}"
)
self.connected = False
self.socketio_client.start_background_task(self._connect_client)
atexit.register(MiddlewareCommunicator.close_all_instances)
atexit.register(self.deinit)
def _connect_client(self):
"""Connect the WebSocket client."""
try:
self.socketio_client.connect(
self.socket_address,
namespaces=["/"],
**self.websocket_kwargs,
retry=True,
)
self.socketio_client.wait() # Wait for messages
except Exception as e:
logging.error(
f"[WebSocketMiddlewarePubSub] Error connecting to {self.socket_address}: {e}"
)
[docs]
def register_callback(self, topic: str, callback):
"""
Register an event handler for a specific topic.
:param topic: str: The topic/event to listen to
:param callback: callable: The function to call when the event occurs
"""
self.socketio_client.on(topic, callback)
logging.info(
f"[WebSocketMiddlewarePubSub] Registered callback for topic {topic}"
)
[docs]
def is_connected(self) -> bool:
"""
Check whether the WebSocket client is connected.
:return: bool: True if connected, False otherwise.
"""
return self.connected
[docs]
@staticmethod
def deinit():
"""
Deinitialize the WebSocket middleware. This method is automatically called when the program exits.
"""
logging.info("[WebSocketMiddlewarePubSub] Deinitializing WebSocket client")
if (
WebSocketMiddlewarePubSub._instance
and WebSocketMiddlewarePubSub._instance.socketio_client.connected
):
WebSocketMiddlewarePubSub._instance.socketio_client.disconnect()
WebSocketMiddlewarePubSub._instance.connected = False