| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182 | # (c) Nelen & Schuurmansfrom copy import deepcopyfrom datetime import datetimefrom typing import Listfrom typing import Optionalfrom clean_python.base.domain import AlreadyExistsfrom clean_python.base.domain import Conflictfrom clean_python.base.domain import DoesNotExistfrom clean_python.base.domain import Filterfrom clean_python.base.domain import Gatewayfrom clean_python.base.domain import Idfrom clean_python.base.domain import Jsonfrom clean_python.base.domain import PageOptions__all__ = ["InMemoryGateway"]class InMemoryGateway(Gateway):    """For testing purposes"""    def __init__(self, data: List[Json]):        self.data = {x["id"]: deepcopy(x) for x in data}    def _get_next_id(self) -> int:        if len(self.data) == 0:            return 1        else:            return max(self.data) + 1    def _paginate(self, objs: List[Json], params: PageOptions) -> List[Json]:        objs = sorted(            objs,            key=lambda x: (x.get(params.order_by) is None, x.get(params.order_by)),            reverse=not params.ascending,        )        return objs[params.offset : params.offset + params.limit]    async def filter(        self, filters: List[Filter], params: Optional[PageOptions] = None    ) -> List[Json]:        result = []        for x in self.data.values():            for filter in filters:                if x.get(filter.field) not in filter.values:                    break            else:                result.append(deepcopy(x))        if params is not None:            result = self._paginate(result, params)        return result    async def add(self, item: Json) -> Json:        item = item.copy()        id_ = item.pop("id", None)        # autoincrement (like SQL does)        if id_ is None:            id_ = self._get_next_id()        elif id_ in self.data:            raise AlreadyExists(id_)        self.data[id_] = {"id": id_, **item}        return deepcopy(self.data[id_])    async def update(        self, item: Json, if_unmodified_since: Optional[datetime] = None    ) -> Json:        _id = item.get("id")        if _id is None or _id not in self.data:            raise DoesNotExist("item", _id)        existing = self.data[_id]        if if_unmodified_since and existing.get("updated_at") != if_unmodified_since:            raise Conflict()        existing.update(item)        return deepcopy(existing)    async def remove(self, id: Id) -> bool:        if id not in self.data:            return False        del self.data[id]        return True
 |