Source code for eventbus.example.entity

# -*- coding: utf-8 -*-
from typing import Any, List, Optional

from eventbus.domain.aggregate import BaseAggregateRoot
from eventbus.domain.entity import TimestampedVersionedEntity


[docs]class ExampleInternal(TimestampedVersionedEntity): def __init__(self, value: int, **kwargs: Any): super().__init__(**kwargs) self.value = value
[docs] class Event(TimestampedVersionedEntity.Event): pass
[docs]class Example(BaseAggregateRoot): def __init__( self, first_name: str, last_name: str, age: int, internals: Optional[List[ExampleInternal]] = None, **kwargs: Any, ): super().__init__(**kwargs) self.first_name = first_name self.last_name = last_name self.age = age self.internals: List[ExampleInternal] = internals or [] @property def summ(self) -> int: return sum(internal.value for internal in self.internals)
[docs] class Event(BaseAggregateRoot.Event): pass
[docs] class ExampleInternalAdded(Event): internal: ExampleInternal
[docs] def mutate(self, obj): obj.internals.append(self.internal)
[docs] async def add(self, value: int): internal = await ExampleInternal.__create__(value=value) await self.__trigger_event__(Example.ExampleInternalAdded, internal=internal)