sql_gateway.py 12 KB


  1. # (c) Nelen & Schuurmans
  2. from contextlib import asynccontextmanager
  3. from datetime import datetime
  4. from typing import AsyncIterator
  5. from typing import Callable
  6. from typing import List
  7. from typing import Optional
  8. from typing import TypeVar
  9. import inject
  10. from sqlalchemy import and_
  11. from sqlalchemy import asc
  12. from sqlalchemy import delete
  13. from sqlalchemy import desc
  14. from sqlalchemy import func
  15. from sqlalchemy import select
  16. from sqlalchemy import Table
  17. from sqlalchemy import true
  18. from sqlalchemy import update
  19. from sqlalchemy.dialects.postgresql import insert
  20. from sqlalchemy.exc import IntegrityError
  21. from sqlalchemy.sql import Executable
  22. from sqlalchemy.sql.expression import ColumnElement
  23. from sqlalchemy.sql.expression import false
  24. from clean_python import AlreadyExists
  25. from clean_python import Conflict
  26. from clean_python import ctx
  27. from clean_python import DoesNotExist
  28. from clean_python import Filter
  29. from clean_python import Gateway
  30. from clean_python import Json
  31. from clean_python import PageOptions
  32. from .sql_provider import SQLDatabase
  33. from .sql_provider import SQLProvider
  34. __all__ = ["SQLGateway"]
  35. def _is_unique_violation_error_id(e: IntegrityError, id: int):
  36. # sqlalchemy wraps the asyncpg error
  37. msg = e.orig.args[0]
  38. return ("duplicate key value violates unique constraint" in msg) and (
  39. f"Key (id)=({id}) already exists." in msg
  40. )
  41. T = TypeVar("T", bound="SQLGateway")
  42. class SQLGateway(Gateway):
  43. table: Table
  44. nested: bool
  45. multitenant: bool
  46. def __init__(
  47. self,
  48. provider_override: Optional[SQLProvider] = None,
  49. nested: bool = False,
  50. ):
  51. self.provider_override = provider_override
  52. self.nested = nested
  53. @property
  54. def provider(self):
  55. return self.provider_override or inject.instance(SQLDatabase)
  56. def __init_subclass__(cls, table: Table, multitenant: bool = False) -> None:
  57. cls.table = table
  58. if multitenant and not hasattr(table.c, "tenant"):
  59. raise ValueError("Can't use a multitenant SQLGateway without tenant column")
  60. cls.multitenant = multitenant
  61. super().__init_subclass__()
  62. def rows_to_dict(self, rows: List[Json]) -> List[Json]:
  63. return rows
  64. def dict_to_row(self, obj: Json) -> Json:
  65. known = {c.key for c in self.table.c}
  66. result = {k: obj[k] for k in obj.keys() if k in known}
  67. if "id" in result and result["id"] is None:
  68. del result["id"]
  69. if self.multitenant:
  70. result["tenant"] = self.current_tenant
  71. return result
  72. @asynccontextmanager
  73. async def transaction(self: T) -> AsyncIterator[T]:
  74. if self.nested:
  75. yield self
  76. else:
  77. async with self.provider.transaction() as provider:
  78. yield self.__class__(provider, nested=True)
  79. @property
  80. def current_tenant(self) -> Optional[int]:
  81. if not self.multitenant:
  82. return None
  83. if ctx.tenant is None:
  84. raise RuntimeError(f"{self.__class__} requires a tenant in the context")
  85. return ctx.tenant.id
  86. async def get_related(self, items: List[Json]) -> None:
  87. pass
  88. async def set_related(self, item: Json, result: Json) -> None:
  89. pass
  90. async def execute(self, query: Executable) -> List[Json]:
  91. assert self.nested
  92. return self.rows_to_dict(await self.provider.execute(query))
  93. async def add(self, item: Json) -> Json:
  94. query = (
  95. insert(self.table).values(**self.dict_to_row(item)).returning(self.table)
  96. )
  97. async with self.transaction() as transaction:
  98. try:
  99. (result,) = await transaction.execute(query)
  100. except IntegrityError as e:
  101. id_ = item.get("id")
  102. if id_ is not None and _is_unique_violation_error_id(e, id_):
  103. raise AlreadyExists(id_)
  104. raise
  105. await transaction.set_related(item, result)
  106. return result
  107. async def update(
  108. self, item: Json, if_unmodified_since: Optional[datetime] = None
  109. ) -> Json:
  110. id_ = item.get("id")
  111. if id_ is None:
  112. raise DoesNotExist("record", id_)
  113. q = self._id_filter_to_sql(id_)
  114. if if_unmodified_since is not None:
  115. q &= self.table.c.updated_at == if_unmodified_since
  116. query = (
  117. update(self.table)
  118. .where(q)
  119. .values(**self.dict_to_row(item))
  120. .returning(self.table)
  121. )
  122. async with self.transaction() as transaction:
  123. result = await transaction.execute(query)
  124. if not result:
  125. if if_unmodified_since is not None:
  126. # note: the get() is to maybe raise DoesNotExist
  127. if await self.get(id_):
  128. raise Conflict()
  129. raise DoesNotExist("record", id_)
  130. await transaction.set_related(item, result[0])
  131. return result[0]
  132. async def _select_for_update(self, id: int) -> Json:
  133. async with self.transaction() as transaction:
  134. result = await transaction.execute(
  135. select(self.table).with_for_update().where(self._id_filter_to_sql(id)),
  136. )
  137. if not result:
  138. raise DoesNotExist("record", id)
  139. await transaction.get_related(result)
  140. return result[0]
  141. async def update_transactional(self, id: int, func: Callable[[Json], Json]) -> Json:
  142. async with self.transaction() as transaction:
  143. existing = await transaction._select_for_update(id)
  144. updated = func(existing)
  145. return await transaction.update(updated)
  146. async def upsert(self, item: Json) -> Json:
  147. if item.get("id") is None:
  148. return await self.add(item)
  149. values = self.dict_to_row(item)
  150. query = (
  151. insert(self.table)
  152. .values(**values)
  153. .on_conflict_do_update(
  154. index_elements=["id", "tenant"] if self.multitenant else ["id"],
  155. set_=values,
  156. )
  157. .returning(self.table)
  158. )
  159. async with self.transaction() as transaction:
  160. result = await transaction.execute(query)
  161. await transaction.set_related(item, result[0])
  162. return result[0]
  163. async def remove(self, id) -> bool:
  164. query = (
  165. delete(self.table)
  166. .where(self._id_filter_to_sql(id))
  167. .returning(self.table.c.id)
  168. )
  169. async with self.transaction() as transaction:
  170. result = await transaction.execute(query)
  171. return bool(result)
  172. def _filter_to_sql(self, filter: Filter) -> ColumnElement:
  173. try:
  174. column = getattr(self.table.c, filter.field)
  175. except AttributeError:
  176. return false()
  177. if len(filter.values) == 0:
  178. return false()
  179. elif len(filter.values) == 1:
  180. return column == filter.values[0]
  181. else:
  182. return column.in_(filter.values)
  183. def _filters_to_sql(self, filters: List[Filter]) -> ColumnElement:
  184. qs = [self._filter_to_sql(x) for x in filters]
  185. if self.multitenant:
  186. qs.append(self.table.c.tenant == self.current_tenant)
  187. return and_(*qs)
  188. def _id_filter_to_sql(self, id: int) -> ColumnElement:
  189. return self._filters_to_sql([Filter(field="id", values=[id])])
  190. async def filter(
  191. self, filters: List[Filter], params: Optional[PageOptions] = None
  192. ) -> List[Json]:
  193. query = select(self.table).where(self._filters_to_sql(filters))
  194. if params is not None:
  195. sort = asc(params.order_by) if params.ascending else desc(params.order_by)
  196. query = query.order_by(sort).limit(params.limit).offset(params.offset)
  197. async with self.transaction() as transaction:
  198. result = await transaction.execute(query)
  199. await transaction.get_related(result)
  200. return result
  201. async def count(self, filters: List[Filter]) -> int:
  202. query = (
  203. select(func.count().label("count"))
  204. .select_from(self.table)
  205. .where(self._filters_to_sql(filters))
  206. )
  207. async with self.transaction() as transaction:
  208. return (await transaction.execute(query))[0]["count"]
  209. async def exists(self, filters: List[Filter]) -> bool:
  210. query = (
  211. select(true().label("exists"))
  212. .select_from(self.table)
  213. .where(self._filters_to_sql(filters))
  214. .limit(1)
  215. )
  216. async with self.transaction() as transaction:
  217. return len(await transaction.execute(query)) > 0
  218. async def _get_related_one_to_many(
  219. self,
  220. items: List[Json],
  221. field_name: str,
  222. fk_name: str,
  223. ) -> None:
  224. """Fetch related objects for `items` and add them inplace.
  225. The result is `items` having an additional field containing a list of related
  226. objects which were retrieved from self in 1 SELECT query.
  227. Args:
  228. items: The items for which to fetch related objects. Changed inplace.
  229. field_name: The key in item to put the fetched related objects into.
  230. fk_name: The column name on the related object that refers to item["id"]
  231. Example:
  232. Writer has a one-to-many relation to books.
  233. >>> writers = [{"id": 2, "name": "John Doe"}]
  234. >>> _get_related_one_to_many(
  235. items=writers,
  236. related_gateway=BookSQLGateway,
  237. field_name="books",
  238. fk_name="writer_id",
  239. )
  240. >>> writers[0]
  241. {
  242. "id": 2,
  243. "name": "John Doe",
  244. "books": [
  245. {
  246. "id": 1",
  247. "title": "How to write an ORM",
  248. "writer_id": 2
  249. }
  250. ]
  251. }
  252. """
  253. assert not self.multitenant
  254. for x in items:
  255. x[field_name] = []
  256. item_lut = {x["id"]: x for x in items}
  257. related_objs = await self.filter(
  258. [Filter(field=fk_name, values=list(item_lut.keys()))]
  259. )
  260. for related_obj in related_objs:
  261. item_lut[related_obj[fk_name]][field_name].append(related_obj)
  262. async def _set_related_one_to_many(
  263. self,
  264. item: Json,
  265. result: Json,
  266. field_name: str,
  267. fk_name: str,
  268. ) -> None:
  269. """Set related objects for `item`
  270. This method first fetches the current situation and then adds / updates / removes
  271. where appropriate.
  272. Args:
  273. item: The item for which to set related objects.
  274. result: The dictionary to put the resulting (added / updated) objects into
  275. field_name: The key in result to put the (added / updated) related objects into.
  276. fk_name: The column name on the related object that refers to item["id"]
  277. Example:
  278. Writer has a one-to-many relation to books.
  279. >>> writer = {"id": 2, "name": "John Doe", "books": {"title": "Foo"}}
  280. >>> _set_related_one_to_many(
  281. item=writer,
  282. result=writer,
  283. related_gateway=BookSQLGateway,
  284. field_name="books",
  285. fk_name="writer_id",
  286. )
  287. >>> result
  288. {
  289. "id": 2,
  290. "name": "John Doe",
  291. "books": [
  292. {
  293. "id": 1",
  294. "title": "Foo",
  295. "writer_id": 2
  296. }
  297. ]
  298. }
  299. """
  300. assert not self.multitenant
  301. # list existing related objects
  302. existing_lut = {
  303. x["id"]: x
  304. for x in await self.filter([Filter(field=fk_name, values=[result["id"]])])
  305. }
  306. # add / update them where necessary
  307. returned = []
  308. for new_value in item.get(field_name, []):
  309. new_value = {fk_name: result["id"], **new_value}
  310. existing = existing_lut.pop(new_value.get("id"), None)
  311. if existing is None:
  312. returned.append(await self.add(new_value))
  313. elif new_value == existing:
  314. returned.append(existing)
  315. else:
  316. returned.append(await self.update(new_value))
  317. result[field_name] = returned
  318. # remove remaining
  319. for to_remove in existing_lut:
  320. assert await self.remove(to_remove)