internal_gateway.py 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. # (c) Nelen & Schuurmans
  2. from abc import abstractmethod
  3. from abc import abstractproperty
  4. from typing import Generic
  5. from typing import List
  6. from typing import Optional
  7. from typing import TypeVar
  8. from clean_python.base.application.manage import Manage
  9. from clean_python.base.domain import BadRequest
  10. from clean_python.base.domain import DoesNotExist
  11. from clean_python.base.domain import Filter
  12. from clean_python.base.domain import Json
  13. from clean_python.base.domain import PageOptions
  14. from clean_python.base.domain import RootEntity
  15. from clean_python.base.domain import ValueObject
  16. __all__ = ["InternalGateway"]
  17. E = TypeVar("E", bound=RootEntity) # External
  18. T = TypeVar("T", bound=ValueObject) # Internal
  19. # don't subclass Gateway; Gateway makes Json objects
  20. class InternalGateway(Generic[E, T]):
  21. @abstractproperty
  22. def manage(self) -> Manage[E]:
  23. raise NotImplementedError()
  24. @abstractmethod
  25. def _map(self, obj: E) -> T:
  26. raise NotImplementedError()
  27. async def get(self, id: int) -> Optional[T]:
  28. try:
  29. result = await self.manage.retrieve(id)
  30. except DoesNotExist:
  31. return None
  32. else:
  33. return self._map(result)
  34. async def filter(
  35. self, filters: List[Filter], params: Optional[PageOptions] = None
  36. ) -> List[T]:
  37. page = await self.manage.filter(filters, params)
  38. return [self._map(x) for x in page.items]
  39. async def add(self, item: T) -> T:
  40. try:
  41. created = await self.manage.create(item.model_dump())
  42. except BadRequest as e:
  43. raise ValueError(e)
  44. return self._map(created)
  45. async def remove(self, id) -> bool:
  46. return await self.manage.destroy(id)
  47. async def count(self, filters: List[Filter]) -> int:
  48. return await self.manage.count(filters)
  49. async def exists(self, filters: List[Filter]) -> bool:
  50. return await self.manage.exists(filters)
  51. async def update(self, values: Json) -> T:
  52. values = values.copy()
  53. id_ = values.pop("id", None)
  54. if id_ is None:
  55. raise DoesNotExist("item", id_)
  56. try:
  57. updated = await self.manage.update(id_, values)
  58. except BadRequest as e:
  59. raise ValueError(e)
  60. return self._map(updated)