Source code for eventbus.application.eventbus
# -*- coding: utf-8 -*-
from typing import List
from eventbus.domain.eventbus import AbstractEventHandler
from eventbus.domain.whitehead import TEvent
_subscriptions: List[AbstractEventHandler] = []
[docs]def subscribe(handler: AbstractEventHandler) -> None:
if handler not in _subscriptions:
_subscriptions.append(handler)
[docs]def unsubscribe(handler: AbstractEventHandler) -> None:
if handler in _subscriptions:
_subscriptions.remove(handler)
[docs]async def publish(events: List[TEvent]) -> None:
for handler in _subscriptions[:]:
if handler.predicate(events):
await handler.handler(events)
[docs]def is_event_handlers_empty() -> bool:
return len(_subscriptions) == 0
[docs]def clear_event_handlers() -> None:
_subscriptions.clear()