manage.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. # (c) Nelen & Schuurmans
  2. from typing import Any
  3. from typing import Generic
  4. from typing import List
  5. from typing import Optional
  6. from typing import Type
  7. from typing import TypeVar
  8. import backoff
  9. from clean_python.base.domain import Conflict
  10. from clean_python.base.domain import Filter
  11. from clean_python.base.domain import Id
  12. from clean_python.base.domain import Json
  13. from clean_python.base.domain import Page
  14. from clean_python.base.domain import PageOptions
  15. from clean_python.base.domain import Repository
  16. from clean_python.base.domain import RootEntity
  17. T = TypeVar("T", bound=RootEntity)
  18. __all__ = ["Manage"]
  19. class Manage(Generic[T]):
  20. repo: Repository[T]
  21. entity: Type[T]
  22. def __init__(self, repo: Optional[Repository[T]] = None):
  23. assert repo is not None
  24. self.repo = repo
  25. def __init_subclass__(cls) -> None:
  26. (base,) = cls.__orig_bases__ # type: ignore
  27. (entity,) = base.__args__
  28. assert issubclass(entity, RootEntity)
  29. super().__init_subclass__()
  30. cls.entity = entity
  31. async def retrieve(self, id: Id) -> T:
  32. return await self.repo.get(id)
  33. async def create(self, values: Json) -> T:
  34. return await self.repo.add(values)
  35. async def update(self, id: Id, values: Json, retry_on_conflict: bool = True) -> T:
  36. """This update has a built-in retry function that can be switched off.
  37. This because some gateways (SQLGateway, ApiGateway) may raise Conflict
  38. errors in case there are concurrency issues. The backoff strategy assumes that
  39. we can retry immediately (because the conflict is gone immediately), but it
  40. does add some jitter between 0 and 200 ms to avoid many competing processes.
  41. If the repo.update is not idempotent (which is atypical), retries should be
  42. switched off.
  43. """
  44. if retry_on_conflict:
  45. return await self._update_with_retries(id, values)
  46. else:
  47. return await self.repo.update(id, values)
  48. @backoff.on_exception(backoff.constant, Conflict, max_tries=10, interval=0.2)
  49. async def _update_with_retries(self, id: Id, values: Json) -> T:
  50. return await self.repo.update(id, values)
  51. async def destroy(self, id: Id) -> bool:
  52. return await self.repo.remove(id)
  53. async def list(self, params: Optional[PageOptions] = None) -> Page[T]:
  54. return await self.repo.all(params)
  55. async def by(
  56. self, key: str, value: Any, params: Optional[PageOptions] = None
  57. ) -> Page[T]:
  58. return await self.repo.by(key, value, params=params)
  59. async def filter(
  60. self, filters: List[Filter], params: Optional[PageOptions] = None
  61. ) -> Page[T]:
  62. return await self.repo.filter(filters, params=params)
  63. async def count(self, filters: List[Filter]) -> int:
  64. return await self.repo.count(filters)
  65. async def exists(self, filters: List[Filter]) -> bool:
  66. return await self.repo.exists(filters)