sql_gateway.py 12 KB


  1. # (c) Nelen & Schuurmans
  2. from contextlib import asynccontextmanager
  3. from datetime import datetime
  4. from typing import AsyncIterator
  5. from typing import Callable
  6. from typing import List
  7. from typing import Optional
  8. from typing import TypeVar
  9. import inject
  10. from sqlalchemy import and_
  11. from sqlalchemy import asc
  12. from sqlalchemy import delete
  13. from sqlalchemy import desc
  14. from sqlalchemy import func
  15. from sqlalchemy import select
  16. from sqlalchemy import Table
  17. from sqlalchemy import true
  18. from sqlalchemy import update
  19. from sqlalchemy.dialects.postgresql import insert
  20. from sqlalchemy.exc import IntegrityError
  21. from sqlalchemy.sql import Executable
  22. from sqlalchemy.sql.expression import ColumnElement
  23. from sqlalchemy.sql.expression import false
  24. from clean_python import AlreadyExists
  25. from clean_python import Conflict
  26. from clean_python import ctx
  27. from clean_python import DoesNotExist
  28. from clean_python import Filter
  29. from clean_python import Gateway
  30. from clean_python import Id
  31. from clean_python import Json
  32. from clean_python import PageOptions
  33. from .sql_provider import SQLDatabase
  34. from .sql_provider import SQLProvider
  35. __all__ = ["SQLGateway"]
  36. def _is_unique_violation_error_id(e: IntegrityError, id: int):
  37. # sqlalchemy wraps the asyncpg error
  38. msg = e.orig.args[0]
  39. return ("duplicate key value violates unique constraint" in msg) and (
  40. f"Key (id)=({id}) already exists." in msg
  41. )
  42. T = TypeVar("T", bound="SQLGateway")
  43. class SQLGateway(Gateway):
  44. table: Table
  45. nested: bool
  46. multitenant: bool
  47. def __init__(
  48. self,
  49. provider_override: Optional[SQLProvider] = None,
  50. nested: bool = False,
  51. ):
  52. self.provider_override = provider_override
  53. self.nested = nested
  54. @property
  55. def provider(self):
  56. return self.provider_override or inject.instance(SQLDatabase)
  57. def __init_subclass__(cls, table: Table, multitenant: bool = False) -> None:
  58. cls.table = table
  59. if multitenant and not hasattr(table.c, "tenant"):
  60. raise ValueError("Can't use a multitenant SQLGateway without tenant column")
  61. cls.multitenant = multitenant
  62. super().__init_subclass__()
  63. def rows_to_dict(self, rows: List[Json]) -> List[Json]:
  64. return rows
  65. def dict_to_row(self, obj: Json) -> Json:
  66. known = {c.key for c in self.table.c}
  67. result = {k: obj[k] for k in obj.keys() if k in known}
  68. if "id" in result and result["id"] is None:
  69. del result["id"]
  70. if self.multitenant:
  71. result["tenant"] = self.current_tenant
  72. return result
  73. @asynccontextmanager
  74. async def transaction(self: T) -> AsyncIterator[T]:
  75. if self.nested:
  76. yield self
  77. else:
  78. async with self.provider.transaction() as provider:
  79. yield self.__class__(provider, nested=True)
  80. @property
  81. def current_tenant(self) -> Optional[int]:
  82. if not self.multitenant:
  83. return None
  84. if ctx.tenant is None:
  85. raise RuntimeError(f"{self.__class__} requires a tenant in the context")
  86. return ctx.tenant.id
  87. async def get_related(self, items: List[Json]) -> None:
  88. pass
  89. async def set_related(self, item: Json, result: Json) -> None:
  90. pass
  91. async def execute(self, query: Executable) -> List[Json]:
  92. assert self.nested
  93. return self.rows_to_dict(await self.provider.execute(query))
  94. async def add(self, item: Json) -> Json:
  95. query = (
  96. insert(self.table).values(**self.dict_to_row(item)).returning(self.table)
  97. )
  98. async with self.transaction() as transaction:
  99. try:
  100. (result,) = await transaction.execute(query)
  101. except IntegrityError as e:
  102. id_ = item.get("id")
  103. if id_ is not None and _is_unique_violation_error_id(e, id_):
  104. raise AlreadyExists(id_)
  105. raise
  106. await transaction.set_related(item, result)
  107. return result
  108. async def update(
  109. self, item: Json, if_unmodified_since: Optional[datetime] = None
  110. ) -> Json:
  111. id_ = item.get("id")
  112. if id_ is None:
  113. raise DoesNotExist("record", id_)
  114. q = self._id_filter_to_sql(id_)
  115. if if_unmodified_since is not None:
  116. q &= self.table.c.updated_at == if_unmodified_since
  117. query = (
  118. update(self.table)
  119. .where(q)
  120. .values(**self.dict_to_row(item))
  121. .returning(self.table)
  122. )
  123. async with self.transaction() as transaction:
  124. result = await transaction.execute(query)
  125. if not result:
  126. if if_unmodified_since is not None:
  127. # note: the get() is to maybe raise DoesNotExist
  128. if await self.get(id_):
  129. raise Conflict()
  130. raise DoesNotExist("record", id_)
  131. await transaction.set_related(item, result[0])
  132. return result[0]
  133. async def _select_for_update(self, id: Id) -> Json:
  134. async with self.transaction() as transaction:
  135. result = await transaction.execute(
  136. select(self.table).with_for_update().where(self._id_filter_to_sql(id)),
  137. )
  138. if not result:
  139. raise DoesNotExist("record", id)
  140. await transaction.get_related(result)
  141. return result[0]
  142. async def update_transactional(self, id: Id, func: Callable[[Json], Json]) -> Json:
  143. async with self.transaction() as transaction:
  144. existing = await transaction._select_for_update(id)
  145. updated = func(existing)
  146. return await transaction.update(updated)
  147. async def upsert(self, item: Json) -> Json:
  148. if item.get("id") is None:
  149. return await self.add(item)
  150. values = self.dict_to_row(item)
  151. query = (
  152. insert(self.table)
  153. .values(**values)
  154. .on_conflict_do_update(
  155. index_elements=["id", "tenant"] if self.multitenant else ["id"],
  156. set_=values,
  157. )
  158. .returning(self.table)
  159. )
  160. async with self.transaction() as transaction:
  161. result = await transaction.execute(query)
  162. await transaction.set_related(item, result[0])
  163. return result[0]
  164. async def remove(self, id: Id) -> bool:
  165. query = (
  166. delete(self.table)
  167. .where(self._id_filter_to_sql(id))
  168. .returning(self.table.c.id)
  169. )
  170. async with self.transaction() as transaction:
  171. result = await transaction.execute(query)
  172. return bool(result)
  173. def _filter_to_sql(self, filter: Filter) -> ColumnElement:
  174. try:
  175. column = getattr(self.table.c, filter.field)
  176. except AttributeError:
  177. return false()
  178. if len(filter.values) == 0:
  179. return false()
  180. elif len(filter.values) == 1:
  181. return column == filter.values[0]
  182. else:
  183. return column.in_(filter.values)
  184. def _filters_to_sql(self, filters: List[Filter]) -> ColumnElement:
  185. qs = [self._filter_to_sql(x) for x in filters]
  186. if self.multitenant:
  187. qs.append(self.table.c.tenant == self.current_tenant)
  188. return and_(*qs)
  189. def _id_filter_to_sql(self, id: Id) -> ColumnElement:
  190. return self._filters_to_sql([Filter(field="id", values=[id])])
  191. async def filter(
  192. self, filters: List[Filter], params: Optional[PageOptions] = None
  193. ) -> List[Json]:
  194. query = select(self.table).where(self._filters_to_sql(filters))
  195. if params is not None:
  196. sort = asc(params.order_by) if params.ascending else desc(params.order_by)
  197. query = query.order_by(sort).limit(params.limit).offset(params.offset)
  198. async with self.transaction() as transaction:
  199. result = await transaction.execute(query)
  200. await transaction.get_related(result)
  201. return result
  202. async def count(self, filters: List[Filter]) -> int:
  203. query = (
  204. select(func.count().label("count"))
  205. .select_from(self.table)
  206. .where(self._filters_to_sql(filters))
  207. )
  208. async with self.transaction() as transaction:
  209. return (await transaction.execute(query))[0]["count"]
  210. async def exists(self, filters: List[Filter]) -> bool:
  211. query = (
  212. select(true().label("exists"))
  213. .select_from(self.table)
  214. .where(self._filters_to_sql(filters))
  215. .limit(1)
  216. )
  217. async with self.transaction() as transaction:
  218. return len(await transaction.execute(query)) > 0
  219. async def _get_related_one_to_many(
  220. self,
  221. items: List[Json],
  222. field_name: str,
  223. fk_name: str,
  224. ) -> None:
  225. """Fetch related objects for `items` and add them inplace.
  226. The result is `items` having an additional field containing a list of related
  227. objects which were retrieved from self in 1 SELECT query.
  228. Args:
  229. items: The items for which to fetch related objects. Changed inplace.
  230. field_name: The key in item to put the fetched related objects into.
  231. fk_name: The column name on the related object that refers to item["id"]
  232. Example:
  233. Writer has a one-to-many relation to books.
  234. >>> writers = [{"id": 2, "name": "John Doe"}]
  235. >>> _get_related_one_to_many(
  236. items=writers,
  237. related_gateway=BookSQLGateway,
  238. field_name="books",
  239. fk_name="writer_id",
  240. )
  241. >>> writers[0]
  242. {
  243. "id": 2,
  244. "name": "John Doe",
  245. "books": [
  246. {
  247. "id": 1",
  248. "title": "How to write an ORM",
  249. "writer_id": 2
  250. }
  251. ]
  252. }
  253. """
  254. assert not self.multitenant
  255. for x in items:
  256. x[field_name] = []
  257. item_lut = {x["id"]: x for x in items}
  258. related_objs = await self.filter(
  259. [Filter(field=fk_name, values=list(item_lut.keys()))]
  260. )
  261. for related_obj in related_objs:
  262. item_lut[related_obj[fk_name]][field_name].append(related_obj)
  263. async def _set_related_one_to_many(
  264. self,
  265. item: Json,
  266. result: Json,
  267. field_name: str,
  268. fk_name: str,
  269. ) -> None:
  270. """Set related objects for `item`
  271. This method first fetches the current situation and then adds / updates / removes
  272. where appropriate.
  273. Args:
  274. item: The item for which to set related objects.
  275. result: The dictionary to put the resulting (added / updated) objects into
  276. field_name: The key in result to put the (added / updated) related objects into.
  277. fk_name: The column name on the related object that refers to item["id"]
  278. Example:
  279. Writer has a one-to-many relation to books.
  280. >>> writer = {"id": 2, "name": "John Doe", "books": {"title": "Foo"}}
  281. >>> _set_related_one_to_many(
  282. item=writer,
  283. result=writer,
  284. related_gateway=BookSQLGateway,
  285. field_name="books",
  286. fk_name="writer_id",
  287. )
  288. >>> result
  289. {
  290. "id": 2,
  291. "name": "John Doe",
  292. "books": [
  293. {
  294. "id": 1",
  295. "title": "Foo",
  296. "writer_id": 2
  297. }
  298. ]
  299. }
  300. """
  301. assert not self.multitenant
  302. # list existing related objects
  303. existing_lut = {
  304. x["id"]: x
  305. for x in await self.filter([Filter(field=fk_name, values=[result["id"]])])
  306. }
  307. # add / update them where necessary
  308. returned = []
  309. for new_value in item.get(field_name, []):
  310. new_value = {fk_name: result["id"], **new_value}
  311. existing = existing_lut.pop(new_value.get("id"), None)
  312. if existing is None:
  313. returned.append(await self.add(new_value))
  314. elif new_value == existing:
  315. returned.append(existing)
  316. else:
  317. returned.append(await self.update(new_value))
  318. result[field_name] = returned
  319. # remove remaining
  320. for to_remove in existing_lut:
  321. assert await self.remove(to_remove)