Source code for wrapyfi.publishers

import logging

from wrapyfi.connect.publishers import Publisher, Publishers


[docs] @Publishers.register("MMO", "fallback") class FallbackPublisher(Publisher):
[docs] def __init__( self, name: str, out_topic: str, carrier: str = "", should_wait: bool = True, missing_middleware_object: str = "", **kwargs, ): logging.warning( f"Fallback publisher employed due to missing middleware or object type: " f"{missing_middleware_object}" ) Publisher.__init__( self, name, out_topic, carrier=carrier, should_wait=should_wait, **kwargs ) self.missing_middleware_object = missing_middleware_object
[docs] def establish(self, repeats: int = -1, **kwargs): return None
[docs] def publish(self, obj): return obj
[docs] def close(self): return None