in_memory_gateway.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. # (c) Nelen & Schuurmans
  2. from copy import deepcopy
  3. from datetime import datetime
  4. from typing import List
  5. from typing import Optional
  6. from clean_python.base.domain import AlreadyExists
  7. from clean_python.base.domain import Conflict
  8. from clean_python.base.domain import DoesNotExist
  9. from clean_python.base.domain import Filter
  10. from clean_python.base.domain import Gateway
  11. from clean_python.base.domain import Json
  12. from clean_python.base.domain import PageOptions
  13. __all__ = ["InMemoryGateway"]
  14. class InMemoryGateway(Gateway):
  15. """For testing purposes"""
  16. def __init__(self, data: List[Json]):
  17. self.data = {x["id"]: deepcopy(x) for x in data}
  18. def _get_next_id(self) -> int:
  19. if len(self.data) == 0:
  20. return 1
  21. else:
  22. return max(self.data) + 1
  23. def _paginate(self, objs: List[Json], params: PageOptions) -> List[Json]:
  24. objs = sorted(
  25. objs,
  26. key=lambda x: (x.get(params.order_by) is None, x.get(params.order_by)),
  27. reverse=not params.ascending,
  28. )
  29. return objs[params.offset : params.offset + params.limit]
  30. async def filter(
  31. self, filters: List[Filter], params: Optional[PageOptions] = None
  32. ) -> List[Json]:
  33. result = []
  34. for x in self.data.values():
  35. for filter in filters:
  36. if x.get(filter.field) not in filter.values:
  37. break
  38. else:
  39. result.append(deepcopy(x))
  40. if params is not None:
  41. result = self._paginate(result, params)
  42. return result
  43. async def add(self, item: Json) -> Json:
  44. item = item.copy()
  45. id_ = item.pop("id", None)
  46. # autoincrement (like SQL does)
  47. if id_ is None:
  48. id_ = self._get_next_id()
  49. elif id_ in self.data:
  50. raise AlreadyExists(id_)
  51. self.data[id_] = {"id": id_, **item}
  52. return deepcopy(self.data[id_])
  53. async def update(
  54. self, item: Json, if_unmodified_since: Optional[datetime] = None
  55. ) -> Json:
  56. _id = item.get("id")
  57. if _id is None or _id not in self.data:
  58. raise DoesNotExist("item", _id)
  59. existing = self.data[_id]
  60. if if_unmodified_since and existing.get("updated_at") != if_unmodified_since:
  61. raise Conflict()
  62. existing.update(item)
  63. return deepcopy(existing)
  64. async def remove(self, id: int) -> bool:
  65. if id not in self.data:
  66. return False
  67. del self.data[id]
  68. return True