Source code for eventbus.domain.eventbus

# -*- coding: utf-8 -*-
from abc import ABC, abstractmethod
from typing import List, Tuple, Type, Union

from eventbus.domain.whitehead import TEvent


[docs]class AbstractEventHandler(ABC): ASTERISK = "*" @property @abstractmethod def event_type(self) -> Union[str, Type, Tuple]: pass
[docs] def filter(self, events: List[TEvent]): # When self.event_type == self.ASTERISK -> return all available events return list(filter(lambda elm: self.event_type == self.ASTERISK or isinstance(elm, self.event_type), events))
[docs] def predicate(self, events) -> bool: return len(self.filter(events)) > 0
[docs] @abstractmethod async def handler(self, events: List[TEvent]): pass