123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 |
- # (c) Nelen & Schuurmans
- from typing import Any
- from typing import Generic
- from typing import List
- from typing import Optional
- from typing import Type
- from typing import TypeVar
- import backoff
- from clean_python.base.domain import Conflict
- from clean_python.base.domain import Filter
- from clean_python.base.domain import Id
- from clean_python.base.domain import Json
- from clean_python.base.domain import Page
- from clean_python.base.domain import PageOptions
- from clean_python.base.domain import Repository
- from clean_python.base.domain import RootEntity
- T = TypeVar("T", bound=RootEntity)
- __all__ = ["Manage"]
- class Manage(Generic[T]):
- repo: Repository[T]
- entity: Type[T]
- def __init__(self, repo: Optional[Repository[T]] = None):
- assert repo is not None
- self.repo = repo
- def __init_subclass__(cls) -> None:
- (base,) = cls.__orig_bases__ # type: ignore
- (entity,) = base.__args__
- assert issubclass(entity, RootEntity)
- super().__init_subclass__()
- cls.entity = entity
- async def retrieve(self, id: Id) -> T:
- return await self.repo.get(id)
- async def create(self, values: Json) -> T:
- return await self.repo.add(values)
- async def update(self, id: Id, values: Json, retry_on_conflict: bool = True) -> T:
- """This update has a built-in retry function that can be switched off.
- This because some gateways (SQLGateway, ApiGateway) may raise Conflict
- errors in case there are concurrency issues. The backoff strategy assumes that
- we can retry immediately (because the conflict is gone immediately), but it
- does add some jitter between 0 and 200 ms to avoid many competing processes.
- If the repo.update is not idempotent (which is atypical), retries should be
- switched off.
- """
- if retry_on_conflict:
- return await self._update_with_retries(id, values)
- else:
- return await self.repo.update(id, values)
- @backoff.on_exception(backoff.constant, Conflict, max_tries=10, interval=0.2)
- async def _update_with_retries(self, id: Id, values: Json) -> T:
- return await self.repo.update(id, values)
- async def destroy(self, id: Id) -> bool:
- return await self.repo.remove(id)
- async def list(self, params: Optional[PageOptions] = None) -> Page[T]:
- return await self.repo.all(params)
- async def by(
- self, key: str, value: Any, params: Optional[PageOptions] = None
- ) -> Page[T]:
- return await self.repo.by(key, value, params=params)
- async def filter(
- self, filters: List[Filter], params: Optional[PageOptions] = None
- ) -> Page[T]:
- return await self.repo.filter(filters, params=params)
- async def count(self, filters: List[Filter]) -> int:
- return await self.repo.count(filters)
- async def exists(self, filters: List[Filter]) -> bool:
- return await self.repo.exists(filters)
|