Source code for eventbus.application.eventbus

# -*- coding: utf-8 -*-
from typing import List

from eventbus.domain.eventbus import AbstractEventHandler
from eventbus.domain.whitehead import TEvent


_subscriptions: List[AbstractEventHandler] = []


[docs]def subscribe(handler: AbstractEventHandler) -> None: if handler not in _subscriptions: _subscriptions.append(handler)
[docs]def unsubscribe(handler: AbstractEventHandler) -> None: if handler in _subscriptions: _subscriptions.remove(handler)
[docs]async def publish(events: List[TEvent]) -> None: for handler in _subscriptions[:]: if handler.predicate(events): await handler.handler(events)
[docs]def is_event_handlers_empty() -> bool: return len(_subscriptions) == 0
[docs]def clear_event_handlers() -> None: _subscriptions.clear()