in_memory_gateway.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. # (c) Nelen & Schuurmans
  2. from copy import deepcopy
  3. from datetime import datetime
  4. from typing import List
  5. from typing import Optional
  6. from clean_python.base.domain import AlreadyExists
  7. from clean_python.base.domain import Conflict
  8. from clean_python.base.domain import DoesNotExist
  9. from clean_python.base.domain import Filter
  10. from clean_python.base.domain import Gateway
  11. from clean_python.base.domain import Id
  12. from clean_python.base.domain import Json
  13. from clean_python.base.domain import PageOptions
  14. __all__ = ["InMemoryGateway"]
  15. class InMemoryGateway(Gateway):
  16. """For testing purposes"""
  17. def __init__(self, data: List[Json]):
  18. self.data = {x["id"]: deepcopy(x) for x in data}
  19. def _get_next_id(self) -> int:
  20. if len(self.data) == 0:
  21. return 1
  22. else:
  23. return max(self.data) + 1
  24. def _paginate(self, objs: List[Json], params: PageOptions) -> List[Json]:
  25. objs = sorted(
  26. objs,
  27. key=lambda x: (x.get(params.order_by) is None, x.get(params.order_by)),
  28. reverse=not params.ascending,
  29. )
  30. return objs[params.offset : params.offset + params.limit]
  31. async def filter(
  32. self, filters: List[Filter], params: Optional[PageOptions] = None
  33. ) -> List[Json]:
  34. result = []
  35. for x in self.data.values():
  36. for filter in filters:
  37. if x.get(filter.field) not in filter.values:
  38. break
  39. else:
  40. result.append(deepcopy(x))
  41. if params is not None:
  42. result = self._paginate(result, params)
  43. return result
  44. async def add(self, item: Json) -> Json:
  45. item = item.copy()
  46. id_ = item.pop("id", None)
  47. # autoincrement (like SQL does)
  48. if id_ is None:
  49. id_ = self._get_next_id()
  50. elif id_ in self.data:
  51. raise AlreadyExists(id_)
  52. self.data[id_] = {"id": id_, **item}
  53. return deepcopy(self.data[id_])
  54. async def update(
  55. self, item: Json, if_unmodified_since: Optional[datetime] = None
  56. ) -> Json:
  57. _id = item.get("id")
  58. if _id is None or _id not in self.data:
  59. raise DoesNotExist("item", _id)
  60. existing = self.data[_id]
  61. if if_unmodified_since and existing.get("updated_at") != if_unmodified_since:
  62. raise Conflict()
  63. existing.update(item)
  64. return deepcopy(existing)
  65. async def remove(self, id: Id) -> bool:
  66. if id not in self.data:
  67. return False
  68. del self.data[id]
  69. return True