domain_event.py 620 B

123456789101112131415161718192021222324252627
  1. # (c) Nelen & Schuurmans
  2. from typing import Awaitable
  3. from typing import Callable
  4. from typing import Type
  5. from typing import TypeVar
  6. import blinker
  7. __all__ = ["DomainEvent"]
  8. T = TypeVar("T", bound="DomainEvent")
  9. class DomainEvent:
  10. @classmethod
  11. def _signal(cls) -> blinker.Signal:
  12. return blinker.signal(cls.__name__)
  13. @classmethod
  14. def register_handler(
  15. cls: Type[T], receiver: Callable[[T], Awaitable[None]]
  16. ) -> Callable[[T], Awaitable[None]]:
  17. return cls._signal().connect(receiver)
  18. async def send_async(self) -> None:
  19. await self._signal().send_async(self)