123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- # (c) Nelen & Schuurmans
- from copy import deepcopy
- from datetime import datetime
- from typing import List
- from typing import Optional
- from clean_python.base.domain import AlreadyExists
- from clean_python.base.domain import Conflict
- from clean_python.base.domain import DoesNotExist
- from clean_python.base.domain import Filter
- from clean_python.base.domain import Gateway
- from clean_python.base.domain import Id
- from clean_python.base.domain import Json
- from clean_python.base.domain import PageOptions
- from clean_python.base.domain import SyncGateway
- __all__ = ["InMemoryGateway", "InMemorySyncGateway"]
- class InMemoryGateway(Gateway):
- """For testing purposes"""
- def __init__(self, data: List[Json]):
- self.data = {x["id"]: deepcopy(x) for x in data}
- def _get_next_id(self) -> int:
- if len(self.data) == 0:
- return 1
- else:
- return max(self.data) + 1
- def _paginate(self, objs: List[Json], params: PageOptions) -> List[Json]:
- objs = sorted(
- objs,
- key=lambda x: (x.get(params.order_by) is None, x.get(params.order_by)),
- reverse=not params.ascending,
- )
- return objs[params.offset : params.offset + params.limit]
- async def filter(
- self, filters: List[Filter], params: Optional[PageOptions] = None
- ) -> List[Json]:
- result = []
- for x in self.data.values():
- for filter in filters:
- if x.get(filter.field) not in filter.values:
- break
- else:
- result.append(deepcopy(x))
- if params is not None:
- result = self._paginate(result, params)
- return result
- async def add(self, item: Json) -> Json:
- item = item.copy()
- id_ = item.pop("id", None)
- # autoincrement (like SQL does)
- if id_ is None:
- id_ = self._get_next_id()
- elif id_ in self.data:
- raise AlreadyExists(id_)
- self.data[id_] = {"id": id_, **item}
- return deepcopy(self.data[id_])
- async def update(
- self, item: Json, if_unmodified_since: Optional[datetime] = None
- ) -> Json:
- _id = item.get("id")
- if _id is None or _id not in self.data:
- raise DoesNotExist("item", _id)
- existing = self.data[_id]
- if if_unmodified_since and existing.get("updated_at") != if_unmodified_since:
- raise Conflict()
- existing.update(item)
- return deepcopy(existing)
- async def remove(self, id: Id) -> bool:
- if id not in self.data:
- return False
- del self.data[id]
- return True
- # This is a copy-paste of InMemoryGateway:
- class InMemorySyncGateway(SyncGateway):
- """For testing purposes"""
- def __init__(self, data: List[Json]):
- self.data = {x["id"]: deepcopy(x) for x in data}
- def _get_next_id(self) -> int:
- if len(self.data) == 0:
- return 1
- else:
- return max(self.data) + 1
- def _paginate(self, objs: List[Json], params: PageOptions) -> List[Json]:
- objs = sorted(
- objs,
- key=lambda x: (x.get(params.order_by) is None, x.get(params.order_by)),
- reverse=not params.ascending,
- )
- return objs[params.offset : params.offset + params.limit]
- def filter(
- self, filters: List[Filter], params: Optional[PageOptions] = None
- ) -> List[Json]:
- result = []
- for x in self.data.values():
- for filter in filters:
- if x.get(filter.field) not in filter.values:
- break
- else:
- result.append(deepcopy(x))
- if params is not None:
- result = self._paginate(result, params)
- return result
- def add(self, item: Json) -> Json:
- item = item.copy()
- id_ = item.pop("id", None)
- # autoincrement (like SQL does)
- if id_ is None:
- id_ = self._get_next_id()
- elif id_ in self.data:
- raise AlreadyExists(id_)
- self.data[id_] = {"id": id_, **item}
- return deepcopy(self.data[id_])
- def update(
- self, item: Json, if_unmodified_since: Optional[datetime] = None
- ) -> Json:
- _id = item.get("id")
- if _id is None or _id not in self.data:
- raise DoesNotExist("item", _id)
- existing = self.data[_id]
- if if_unmodified_since and existing.get("updated_at") != if_unmodified_since:
- raise Conflict()
- existing.update(item)
- return deepcopy(existing)
- def remove(self, id: Id) -> bool:
- if id not in self.data:
- return False
- del self.data[id]
- return True
|