eventbus.util package¶
Submodules¶
eventbus.util.hashing module¶
eventbus.util.topic module¶
-
eventbus.util.topic.get_entity(event: TEvent) → TEntity[source]¶ Get Originator Entity by event.
To support get_entity, event should provice originator_topic and __entity_kwargs__ properties Only Created event supports this protocol out of the box
Parameters: event – Domain event Returns: Domain entity
-
eventbus.util.topic.get_topic(domain_class: type) → str[source]¶ Returns a string describing a class.
Parameters: domain_class – A class. Returns: A string describing the class.
-
eventbus.util.topic.resolve_attr(obj: Any, path: str) → Any[source]¶ A recursive version of getattr for navigating dotted paths.
Parameters: - obj – An object for which we want to retrieve a nested attribute.
- path – A dot separated string containing zero or more attribute names.
Raises: AttributeError – If there is no such attribute.
Returns: The attribute referred to by the path.
-
eventbus.util.topic.resolve_topic(topic: str) → Any[source]¶ Resolves topic to the object it references.
Parameters: topic – A string describing a code object (e.g. an object class). Raises: TopicResolutionError – If there is no such class. Returns: Code object that the topic references.
eventbus.util.transcoding module¶
-
class
eventbus.util.transcoding.ObjectJSONDecoder(object_hook=None, **kwargs)[source]¶ Bases:
json.decoder.JSONDecoder
-
class
eventbus.util.transcoding.ObjectJSONEncoder(sort_keys=False)[source]¶ Bases:
json.encoder.JSONEncoder