gateway.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. # (c) Nelen & Schuurmans
  2. from abc import ABC
  3. from datetime import datetime
  4. from typing import Callable
  5. from typing import List
  6. from typing import Optional
  7. from .exceptions import DoesNotExist
  8. from .filter import Filter
  9. from .pagination import PageOptions
  10. from .types import Id
  11. from .types import Json
  12. __all__ = ["Gateway", "SyncGateway"]
  13. class Gateway(ABC):
  14. async def filter(
  15. self, filters: List[Filter], params: Optional[PageOptions] = None
  16. ) -> List[Json]:
  17. raise NotImplementedError()
  18. async def count(self, filters: List[Filter]) -> int:
  19. return len(await self.filter(filters, params=None))
  20. async def exists(self, filters: List[Filter]) -> bool:
  21. return len(await self.filter(filters, params=PageOptions(limit=1))) > 0
  22. async def get(self, id: Id) -> Optional[Json]:
  23. result = await self.filter([Filter(field="id", values=[id])], params=None)
  24. return result[0] if result else None
  25. async def add(self, item: Json) -> Json:
  26. raise NotImplementedError()
  27. async def update(
  28. self, item: Json, if_unmodified_since: Optional[datetime] = None
  29. ) -> Json:
  30. raise NotImplementedError()
  31. async def update_transactional(self, id: Id, func: Callable[[Json], Json]) -> Json:
  32. existing = await self.get(id)
  33. if existing is None:
  34. raise DoesNotExist("record", id)
  35. return await self.update(
  36. func(existing), if_unmodified_since=existing["updated_at"]
  37. )
  38. async def upsert(self, item: Json) -> Json:
  39. try:
  40. return await self.update(item)
  41. except DoesNotExist:
  42. return await self.add(item)
  43. async def remove(self, id: Id) -> bool:
  44. raise NotImplementedError()
  45. # This is a copy-paste from clean_python.Gateway, but with all the async / await removed
  46. class SyncGateway:
  47. def filter(
  48. self, filters: List[Filter], params: Optional[PageOptions] = None
  49. ) -> List[Json]:
  50. raise NotImplementedError()
  51. def count(self, filters: List[Filter]) -> int:
  52. return len(self.filter(filters, params=None))
  53. def exists(self, filters: List[Filter]) -> bool:
  54. return len(self.filter(filters, params=PageOptions(limit=1))) > 0
  55. def get(self, id: Id) -> Optional[Json]:
  56. result = self.filter([Filter(field="id", values=[id])], params=None)
  57. return result[0] if result else None
  58. def add(self, item: Json) -> Json:
  59. raise NotImplementedError()
  60. def update(
  61. self, item: Json, if_unmodified_since: Optional[datetime] = None
  62. ) -> Json:
  63. raise NotImplementedError()
  64. def update_transactional(self, id: Id, func: Callable[[Json], Json]) -> Json:
  65. existing = self.get(id)
  66. if existing is None:
  67. raise DoesNotExist("record", id)
  68. return self.update(func(existing), if_unmodified_since=existing["updated_at"])
  69. def upsert(self, item: Json) -> Json:
  70. try:
  71. return self.update(item)
  72. except DoesNotExist:
  73. return self.add(item)
  74. def remove(self, id: Id) -> bool:
  75. raise NotImplementedError()