Source code for eventbus.domain.entity

# -*- coding: utf-8 -*-
from abc import ABCMeta
from datetime import datetime
from uuid import UUID, uuid4
from typing import Any, Dict, List, Type, Union, TypeVar, Optional

from eventbus.application.eventbus import publish
from eventbus.domain.decorators import subclassevents
from eventbus.domain.events import (
    EventWithOriginatorID, CreatedEvent, AttributeChangedEvent,
    DomainEvent, EventWithOriginatorVersion, EventWithTimestamp, DiscardedEvent
)
from eventbus.domain.exceptions import OriginatorIDError, EntityIsDiscarded, OriginatorVersionError
from eventbus.domain.whitehead import EnduringObject
from eventbus.util.topic import get_topic, resolve_topic


[docs]class MetaDomainEntity(ABCMeta): __subclassevents__ = False def __init__(cls, name: str, *args: Any, **kwargs: Any) -> None: # noqa super().__init__(name, *args, **kwargs) if cls.__subclassevents__ is True: subclassevents(cls)
TDomainEntity = TypeVar("TDomainEntity", bound="DomainEntity") TDomainEvent = TypeVar("TDomainEvent", bound="DomainEntity.Event")
[docs]class DomainEntity(EnduringObject, metaclass=MetaDomainEntity): """ Supertype for domain model entity. """ __subclassevents__ = False
[docs] class Event(EventWithOriginatorID[TDomainEntity]): """ Supertype for events of domain model entities. """ def __check_obj__(self, obj: TDomainEntity) -> None: """ Checks state of obj before mutating. :param obj: Domain entity to be checked. :raises OriginatorIDError: if the originator_id is mismatched """ # Assert ID matches originator ID. if obj.id != self.originator_id: raise OriginatorIDError( "'{0}' not equal to event originator ID '{1}'" "".format(obj.id, self.originator_id) )
@classmethod async def __create__( cls: Type[TDomainEntity], originator_id: Optional[UUID] = None, event_class: Optional[Type["DomainEntity.Created[TDomainEntity]"]] = None, **kwargs: Any, ) -> TDomainEntity: """ Creates a new domain entity. Constructs a "created" event, constructs the entity object from the event, publishes the "created" event, and returns the new domain entity object. :param cls DomainEntity: Class of domain event :param originator_id: ID of the new domain entity (defaults to ``uuid4()``). :param event_class: Domain event class to be used for the "created" event. :param kwargs: Other named attribute values of the "created" event. :return: New domain entity object. :rtype: DomainEntity """ if originator_id is None: originator_id = uuid4() if event_class is None: created_event_class: Type[DomainEntity.Created[TDomainEntity]] = cls.Created else: created_event_class = event_class event = created_event_class( originator_id=originator_id, originator_topic=get_topic(cls), **kwargs ) obj = event.__mutate__(None) if obj is None: raise ValueError("{0} returned None".format(type(event).__mutate__.__qualname__)) await obj.__publish__(event) return obj
[docs] class Created(CreatedEvent[TDomainEntity], Event[TDomainEntity]): """ Triggered when an entity is created. """ def __init__(self, originator_topic: str, **kwargs: Any): super(DomainEntity.Created, self).__init__( originator_topic=originator_topic, **kwargs ) @property def originator_topic(self) -> str: """ Topic (a string) representing the class of the originating domain entity. :rtype: str """ return self.__dict__["originator_topic"] def __mutate__(self, obj: Optional[TDomainEntity]) -> Optional[TDomainEntity]: """ Constructs object from an entity class, which is obtained by resolving the originator topic, unless it is given as method argument ``entity_class``. entity_class: Class of domain entity to be constructed. """ entity_class: Type[TDomainEntity] = resolve_topic(self.originator_topic) return entity_class(**self.__entity_kwargs__) @property def __entity_kwargs__(self) -> Dict[str, Any]: kwargs = self.__dict__.copy() kwargs["id"] = kwargs.pop("originator_id") kwargs.pop("discarded", None) kwargs.pop("originator_topic", None) kwargs.pop("__event_topic__", None) return kwargs
def __init__(self, id: UUID, discarded: bool = False, **kwargs): super().__init__() self._id = id self.__is_discarded__ = discarded @property def id(self) -> UUID: """The immutable ID of the domain entity. This value is set using the ``originator_id`` of the "created" event constructed by ``__create__()``. An entity ID allows an instance to be referenced and distinguished from others, even though its state may change over time. This attribute has the normal "public" format for a Python object attribute name, because by definition all domain entities have an ID. """ return self._id
[docs] class AttributeChanged(Event[TDomainEntity], AttributeChangedEvent[TDomainEntity]): """ Triggered when a named attribute is assigned a new value. """ def __mutate__(self, obj: Optional[TDomainEntity]) -> Optional[TDomainEntity]: obj = super(DomainEntity.AttributeChanged, self).__mutate__(obj) setattr(obj, self.name, self.value) return obj
async def __change_attribute__(self: TDomainEntity, name: str, value: Any, **kwargs) -> None: """ Changes named attribute with the given value, by triggering an AttributeChanged event. """ event_class: Type["DomainEntity.AttributeChanged[TDomainEntity]"] = self.AttributeChanged await self.__trigger_event__(event_class=event_class, name=name, value=value, **kwargs)
[docs] class Discarded(DiscardedEvent[TDomainEntity], Event[TDomainEntity]): """ Triggered when a DomainEntity is discarded. """ def __mutate__(self, obj: Optional[TDomainEntity]) -> Optional[TDomainEntity]: obj = super(DomainEntity.Discarded, self).__mutate__(obj) if obj is not None: obj.__is_discarded__ = True return None
async def __discard__(self: TDomainEntity, **kwargs) -> None: """ Discards self, by triggering a Discarded event. """ event_class: Type["DomainEntity.Discarded[TDomainEntity]"] = self.Discarded await self.__trigger_event__(event_class=event_class, **kwargs) def __assert_not_discarded__(self) -> None: """ Asserts that this entity has not been discarded. Raises EntityIsDiscarded exception if entity has been discarded already. """ if self.__is_discarded__: raise EntityIsDiscarded("Entity is discarded") async def __trigger_event__(self, event_class: Type[TDomainEvent], **kwargs: Any) -> None: """ Constructs, applies, and publishes a domain event. """ self.__assert_not_discarded__() event: TDomainEvent = event_class(originator_id=self.id, **kwargs) self.__mutate__(event) await self.__publish__(event) def __mutate__(self, event: TDomainEvent) -> None: """ Mutates this entity with the given event. This method calls on the event object to mutate this entity, because the mutation behaviour of different types of events was usefully factored onto the event classes, and the event mutate() method is the most convenient way to defined behaviour in domain models. However, as an alternative to implementing the mutate() method on domain model events, this method can be extended with a method that is capable of mutating an entity for all the domain event classes introduced by the entity class. Similarly, this method can be overridden entirely in subclasses, so long as all of the mutation behaviour is implemented in the mutator function, including the mutation behaviour of the events defined on the library event classes that would no longer be invoked. However, if the entity class defines a mutator function, or if a separate mutator function is used, then it must be involved in the event sourced repository used to replay events, which by default knows nothing about the domain entity class. In practice, this means having a repository for each kind of entity, rather than the application just having one repository, with each repository having a mutator function that can project the entity events into an entity. """ if not isinstance(event, DomainEntity.Event): raise ValueError("Given Event is not an instance of DomainEntity.Event") event.__check_obj__(self) event.__mutate__(self) async def __publish__(self, events: Union[TDomainEvent, List[TDomainEvent]]) -> None: """ Publishes given event for subscribers in the application. :param events: domain event or list of events """ if isinstance(events, DomainEvent): events = [events] await publish(events) def __eq__(self, other: object) -> bool: return type(self) == type(other) and self.__dict__ == other.__dict__ def __ne__(self, other: object) -> bool: return not self.__eq__(other)
TVersionedEntity = TypeVar("TVersionedEntity", bound="VersionedEntity") TVersionedEvent = TypeVar("TVersionedEvent", bound="VersionedEntity.Event")
[docs]class VersionedEntity(DomainEntity): def __init__(self, __version__: int, **kwargs: Any): super().__init__(**kwargs) self.___version__: int = __version__ @property def __version__(self) -> int: return self.___version__ async def __trigger_event__(self, event_class: Type[TDomainEvent], **kwargs: Any) -> None: """ Increments the version number when an event is triggered. The event carries the version number that the originator will have when the originator is mutated with this event. (The event's "originator" version isn't the version of the originator before the event was triggered, but represents the result of the work of incrementing the version, which is then set in the event as normal. The Created event has version 0, and a newly created instance is at version 0. The second event has originator version 1, and so will the originator when the second event has been applied. """ # Do the work of incrementing the version number. next_version = self.__version__ + 1 # Trigger an event with the result of this work. await super(VersionedEntity, self).__trigger_event__( event_class=event_class, originator_version=next_version, **kwargs )
[docs] class Event( EventWithOriginatorVersion[TVersionedEntity], DomainEntity.Event[TVersionedEntity], ): """Supertype for events of versioned entities.""" def __mutate__(self, obj: Optional[TVersionedEntity]) -> Optional[TVersionedEntity]: obj = super(VersionedEntity.Event, self).__mutate__(obj) if obj is not None: obj.___version__ = self.originator_version return obj def __check_obj__(self, obj: TVersionedEntity) -> None: """ Extends superclass method by checking the event's originator version follows (1 +) this entity's version. """ super(VersionedEntity.Event, self).__check_obj__(obj) # Assert the version sequence is correct. if self.originator_version != obj.__version__ + 1: raise OriginatorVersionError( ( "Event takes entity to version {0}, " "but entity is currently at version {1}. " "Event type: '{2}', entity type: '{3}', entity ID: '{4}'" "".format( self.originator_version, obj.__version__, type(self).__name__, type(obj).__name__, obj._id, # noqa ) ) )
[docs] class Created(DomainEntity.Created[TVersionedEntity], Event[TVersionedEntity]): """Published when a VersionedEntity is created.""" def __init__(self, originator_version: int = 0, *args: Any, **kwargs: Any): super(VersionedEntity.Created, self).__init__(originator_version=originator_version, *args, **kwargs) @property def __entity_kwargs__(self) -> Dict[str, Any]: # Get super property. kwargs = super(VersionedEntity.Created, self).__entity_kwargs__ kwargs["__version__"] = kwargs.pop("originator_version") return kwargs
[docs] class AttributeChanged(Event[TVersionedEntity], DomainEntity.AttributeChanged[TVersionedEntity]): """Published when a VersionedEntity is changed."""
[docs] class Discarded(Event[TVersionedEntity], DomainEntity.Discarded[TVersionedEntity]): """Published when a VersionedEntity is discarded."""
TTimestampedEntity = TypeVar("TTimestampedEntity", bound="TimestampedEntity")
[docs]class TimestampedEntity(DomainEntity): def __init__(self, __created_on__: datetime, __updated_on__: datetime, **kwargs: Any): super(TimestampedEntity, self).__init__(**kwargs) self.___created_on__ = __created_on__ self.___updated_on__ = __updated_on__ self.___last_modified__ = __created_on__ @property def __created_on__(self) -> datetime: return self.___created_on__ @property def __updated_on__(self) -> datetime: return self.___updated_on__ @property def __last_modified__(self) -> datetime: return self.___last_modified__
[docs] class Event(DomainEntity.Event[TTimestampedEntity], EventWithTimestamp[TTimestampedEntity]): """Supertype for events of timestamped entities.""" def __mutate__(self, obj: Optional[TTimestampedEntity]) -> Optional[TTimestampedEntity]: """Updates 'obj' with values from self.""" obj = super(TimestampedEntity.Event, self).__mutate__(obj) if obj is not None: obj.___last_modified__ = self.timestamp return obj
[docs] class Created(DomainEntity.Created[TTimestampedEntity], Event[TTimestampedEntity]): """Published when a TimestampedEntity is created.""" @property def __entity_kwargs__(self) -> Dict[str, Any]: # Get super property. kwargs = super(TimestampedEntity.Created, self).__entity_kwargs__ timestamp = kwargs.pop("timestamp") kwargs["__created_on__"] = timestamp kwargs["__updated_on__"] = timestamp return kwargs
[docs] class AttributeChanged( Event[TTimestampedEntity], DomainEntity.AttributeChanged[TTimestampedEntity] ): """Published when a TimestampedEntity is changed.""" def __mutate__(self, obj: Optional[TTimestampedEntity]) -> Optional[TTimestampedEntity]: """Updates 'obj' with values from self.""" obj = super(TimestampedEntity.AttributeChanged, self).__mutate__(obj) if obj is not None: obj.___updated_on__ = self.timestamp return obj
[docs] class Discarded(Event[TTimestampedEntity], DomainEntity.Discarded[TTimestampedEntity]): """Published when a TimestampedEntity is discarded."""
TTimestampedVersionedEntity = TypeVar("TTimestampedVersionedEntity", bound="TimestampedVersionedEntity")
[docs]class TimestampedVersionedEntity(TimestampedEntity, VersionedEntity):
[docs] class Event( TimestampedEntity.Event[TTimestampedVersionedEntity], VersionedEntity.Event[TTimestampedVersionedEntity], ): """Supertype for events of timestamped, versioned entities."""
[docs] class Created( TimestampedEntity.Created[TTimestampedVersionedEntity], VersionedEntity.Created, Event[TTimestampedVersionedEntity], ): """Published when a TimestampedVersionedEntity is created."""
[docs] class AttributeChanged( Event[TTimestampedVersionedEntity], TimestampedEntity.AttributeChanged[TTimestampedVersionedEntity], VersionedEntity.AttributeChanged[TTimestampedVersionedEntity], ): """Published when a TimestampedVersionedEntity is created."""
[docs] class Discarded( Event[TTimestampedVersionedEntity], TimestampedEntity.Discarded[TTimestampedVersionedEntity], VersionedEntity.Discarded[TTimestampedVersionedEntity], ): """Published when a TimestampedVersionedEntity is discarded."""