internal_gateway.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. # -*- coding: utf-8 -*-
  2. # (c) Nelen & Schuurmans
  3. from abc import abstractmethod
  4. from abc import abstractproperty
  5. from typing import Generic
  6. from typing import List
  7. from typing import Optional
  8. from typing import TypeVar
  9. from clean_python.base.application.manage import Manage
  10. from clean_python.base.domain.exceptions import BadRequest
  11. from clean_python.base.domain.exceptions import DoesNotExist
  12. from clean_python.base.domain.pagination import PageOptions
  13. from clean_python.base.domain.root_entity import RootEntity
  14. from clean_python.base.domain.value_object import ValueObject
  15. from clean_python.base.infrastructure.gateway import Filter
  16. E = TypeVar("E", bound=RootEntity) # External
  17. T = TypeVar("T", bound=ValueObject) # Internal
  18. # don't subclass Gateway; Gateway makes Json objects
  19. class InternalGateway(Generic[E, T]):
  20. @abstractproperty
  21. def manage(self) -> Manage[E]:
  22. raise NotImplementedError()
  23. @abstractmethod
  24. def _map(self, obj: E) -> T:
  25. raise NotImplementedError()
  26. async def get(self, id: int) -> Optional[T]:
  27. try:
  28. result = await self.manage.retrieve(id)
  29. except DoesNotExist:
  30. return None
  31. else:
  32. return self._map(result)
  33. async def filter(
  34. self, filters: List[Filter], params: Optional[PageOptions] = None
  35. ) -> List[T]:
  36. page = await self.manage.filter(filters, params)
  37. return [self._map(x) for x in page.items]
  38. async def add(self, item: T) -> T:
  39. try:
  40. created = await self.manage.create(item.dict())
  41. except BadRequest as e:
  42. raise ValueError(e)
  43. return self._map(created)
  44. async def remove(self, id) -> bool:
  45. return await self.manage.destroy(id)
  46. async def count(self, filters: List[Filter]) -> int:
  47. return await self.manage.count(filters)
  48. async def exists(self, filters: List[Filter]) -> bool:
  49. return await self.manage.exists(filters)