| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 | # (c) Nelen & Schuurmansfrom copy import deepcopyfrom datetime import datetimefrom typing import Listfrom typing import Optionalfrom clean_python.base.domain import AlreadyExistsfrom clean_python.base.domain import Conflictfrom clean_python.base.domain import DoesNotExistfrom clean_python.base.domain import Filterfrom clean_python.base.domain import Gatewayfrom clean_python.base.domain import Idfrom clean_python.base.domain import Jsonfrom clean_python.base.domain import PageOptionsfrom clean_python.base.domain import SyncGateway__all__ = ["InMemoryGateway", "InMemorySyncGateway"]class InMemoryGateway(Gateway):    """For testing purposes"""    def __init__(self, data: List[Json]):        self.data = {x["id"]: deepcopy(x) for x in data}    def _get_next_id(self) -> int:        if len(self.data) == 0:            return 1        else:            return max(self.data) + 1    def _paginate(self, objs: List[Json], params: PageOptions) -> List[Json]:        objs = sorted(            objs,            key=lambda x: (x.get(params.order_by) is None, x.get(params.order_by)),            reverse=not params.ascending,        )        return objs[params.offset : params.offset + params.limit]    async def filter(        self, filters: List[Filter], params: Optional[PageOptions] = None    ) -> List[Json]:        result = []        for x in self.data.values():            for filter in filters:                if x.get(filter.field) not in filter.values:                    break            else:                result.append(deepcopy(x))        if params is not None:            result = self._paginate(result, params)        return result    async def add(self, item: Json) -> Json:        item = item.copy()        id_ = item.pop("id", None)        # autoincrement (like SQL does)        if id_ is None:            id_ = self._get_next_id()        elif id_ in self.data:            raise AlreadyExists(id_)        self.data[id_] = {"id": id_, **item}        return deepcopy(self.data[id_])    async def update(        self, item: Json, if_unmodified_since: Optional[datetime] = None    ) -> Json:        _id = item.get("id")        if _id is None or _id not in self.data:            raise DoesNotExist("item", _id)        existing = self.data[_id]        if if_unmodified_since and existing.get("updated_at") != if_unmodified_since:            raise Conflict()        existing.update(item)        return deepcopy(existing)    async def remove(self, id: Id) -> bool:        if id not in self.data:            return False        del self.data[id]        return True# This is a copy-paste of InMemoryGateway:class InMemorySyncGateway(SyncGateway):    """For testing purposes"""    def __init__(self, data: List[Json]):        self.data = {x["id"]: deepcopy(x) for x in data}    def _get_next_id(self) -> int:        if len(self.data) == 0:            return 1        else:            return max(self.data) + 1    def _paginate(self, objs: List[Json], params: PageOptions) -> List[Json]:        objs = sorted(            objs,            key=lambda x: (x.get(params.order_by) is None, x.get(params.order_by)),            reverse=not params.ascending,        )        return objs[params.offset : params.offset + params.limit]    def filter(        self, filters: List[Filter], params: Optional[PageOptions] = None    ) -> List[Json]:        result = []        for x in self.data.values():            for filter in filters:                if x.get(filter.field) not in filter.values:                    break            else:                result.append(deepcopy(x))        if params is not None:            result = self._paginate(result, params)        return result    def add(self, item: Json) -> Json:        item = item.copy()        id_ = item.pop("id", None)        # autoincrement (like SQL does)        if id_ is None:            id_ = self._get_next_id()        elif id_ in self.data:            raise AlreadyExists(id_)        self.data[id_] = {"id": id_, **item}        return deepcopy(self.data[id_])    def update(        self, item: Json, if_unmodified_since: Optional[datetime] = None    ) -> Json:        _id = item.get("id")        if _id is None or _id not in self.data:            raise DoesNotExist("item", _id)        existing = self.data[_id]        if if_unmodified_since and existing.get("updated_at") != if_unmodified_since:            raise Conflict()        existing.update(item)        return deepcopy(existing)    def remove(self, id: Id) -> bool:        if id not in self.data:            return False        del self.data[id]        return True
 |