| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 | # (c) Nelen & Schuurmansfrom typing import Anyfrom typing import Genericfrom typing import Listfrom typing import Optionalfrom typing import Typefrom typing import TypeVarimport backofffrom clean_python.base.domain import Conflictfrom clean_python.base.domain import Filterfrom clean_python.base.domain import Idfrom clean_python.base.domain import Jsonfrom clean_python.base.domain import Pagefrom clean_python.base.domain import PageOptionsfrom clean_python.base.domain import Repositoryfrom clean_python.base.domain import RootEntityT = TypeVar("T", bound=RootEntity)__all__ = ["Manage"]class Manage(Generic[T]):    repo: Repository[T]    entity: Type[T]    def __init__(self, repo: Optional[Repository[T]] = None):        assert repo is not None        self.repo = repo    def __init_subclass__(cls) -> None:        (base,) = cls.__orig_bases__  # type: ignore        (entity,) = base.__args__        assert issubclass(entity, RootEntity)        super().__init_subclass__()        cls.entity = entity    async def retrieve(self, id: Id) -> T:        return await self.repo.get(id)    async def create(self, values: Json) -> T:        return await self.repo.add(values)    async def update(self, id: Id, values: Json, retry_on_conflict: bool = True) -> T:        """This update has a built-in retry function that can be switched off.        This because some gateways (SQLGateway, ApiGateway) may raise Conflict        errors in case there are concurrency issues. The backoff strategy assumes that        we can retry immediately (because the conflict is gone immediately), but it        does add some jitter between 0 and 200 ms to avoid many competing processes.        If the repo.update is not idempotent (which is atypical), retries should be        switched off.        """        if retry_on_conflict:            return await self._update_with_retries(id, values)        else:            return await self.repo.update(id, values)    @backoff.on_exception(backoff.constant, Conflict, max_tries=10, interval=0.2)    async def _update_with_retries(self, id: Id, values: Json) -> T:        return await self.repo.update(id, values)    async def destroy(self, id: Id) -> bool:        return await self.repo.remove(id)    async def list(self, params: Optional[PageOptions] = None) -> Page[T]:        return await self.repo.all(params)    async def by(        self, key: str, value: Any, params: Optional[PageOptions] = None    ) -> Page[T]:        return await self.repo.by(key, value, params=params)    async def filter(        self, filters: List[Filter], params: Optional[PageOptions] = None    ) -> Page[T]:        return await self.repo.filter(filters, params=params)    async def count(self, filters: List[Filter]) -> int:        return await self.repo.count(filters)    async def exists(self, filters: List[Filter]) -> bool:        return await self.repo.exists(filters)
 |