Source code for wrapyfi.listeners

import logging

from wrapyfi.connect.listeners import Listener, Listeners, ListenerWatchDog


[docs] @Listeners.register("MMO", "fallback") class FallbackListener(Listener):
[docs] def __init__( self, name: str, in_topic: str, carrier: str = "tcp", should_wait: bool = True, missing_middleware_object: str = "", **kwargs, ): logging.warning( f"Fallback listener employed due to missing middleware or object type: " f"{missing_middleware_object}" ) Listener.__init__( self, name, in_topic, carrier=carrier, should_wait=should_wait, **kwargs ) self.missing_middleware_object = missing_middleware_object
[docs] def establish(self, repeats: int = -1, **kwargs): return None
[docs] def listen(self): return None
[docs] def close(self): return None