internal_gateway.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. # (c) Nelen & Schuurmans
  2. from abc import abstractmethod
  3. from abc import abstractproperty
  4. from datetime import datetime
  5. from typing import Generic
  6. from typing import List
  7. from typing import Optional
  8. from typing import TypeVar
  9. from clean_python.base.application.manage import Manage
  10. from clean_python.base.domain import BadRequest
  11. from clean_python.base.domain import DoesNotExist
  12. from clean_python.base.domain import Filter
  13. from clean_python.base.domain import Gateway
  14. from clean_python.base.domain import Id
  15. from clean_python.base.domain import Json
  16. from clean_python.base.domain import PageOptions
  17. from clean_python.base.domain import RootEntity
  18. __all__ = ["InternalGateway"]
  19. T = TypeVar("T", bound=RootEntity) # External
  20. class InternalGateway(Gateway, Generic[T]):
  21. @abstractproperty
  22. def manage(self) -> Manage[T]:
  23. raise NotImplementedError()
  24. @abstractmethod
  25. def to_internal(self, obj: T) -> Json:
  26. raise NotImplementedError()
  27. def to_external(self, values: Json) -> Json:
  28. return values
  29. async def filter(
  30. self, filters: List[Filter], params: Optional[PageOptions] = None
  31. ) -> List[Json]:
  32. page = await self.manage.filter(filters, params)
  33. return [self.to_internal(x) for x in page.items]
  34. async def add(self, item: Json) -> Json:
  35. try:
  36. created = await self.manage.create(self.to_external(item))
  37. except BadRequest as e:
  38. raise ValueError(e)
  39. return self.to_internal(created)
  40. async def remove(self, id: Id) -> bool:
  41. return await self.manage.destroy(id)
  42. async def count(self, filters: List[Filter]) -> int:
  43. return await self.manage.count(filters)
  44. async def exists(self, filters: List[Filter]) -> bool:
  45. return await self.manage.exists(filters)
  46. async def update(
  47. self, item: Json, if_unmodified_since: Optional[datetime] = None
  48. ) -> Json:
  49. assert if_unmodified_since is None # unsupported
  50. values = self.to_external(item)
  51. id_ = values.pop("id", None)
  52. if id_ is None:
  53. raise DoesNotExist("item", id_)
  54. try:
  55. updated = await self.manage.update(id_, values)
  56. except BadRequest as e:
  57. raise ValueError(e)
  58. return self.to_internal(updated)