Преглед на файлове

Automatically retry on conflict (#38)

Casper van der Wel преди 1 година
родител
ревизия
e122e8e3bd
променени са 5 файла, в които са добавени 47 реда и са изтрити 8 реда
  1. 4 2
      CHANGES.md
  2. 21 2
      clean_python/base/application/manage.py
  3. 1 1
      clean_python/base/domain/exceptions.py
  4. 1 1
      pyproject.toml
  5. 20 2
      tests/test_manage.py

+ 4 - 2
CHANGES.md

@@ -1,10 +1,12 @@
 # Changelog of clean-python
 
 
-0.8.5 (unreleased)
+0.9.0 (unreleased)
 ------------------
 
-- Nothing changed yet.
+- Manage.update now automatically retries if a Conflict is raised.
+
+- AlreadyExists is not a subclass of Conflict anymore.
 
 
 0.8.4 (2023-11-15)

+ 21 - 2
clean_python/base/application/manage.py

@@ -7,6 +7,9 @@ from typing import Optional
 from typing import Type
 from typing import TypeVar
 
+import backoff
+
+from clean_python.base.domain import Conflict
 from clean_python.base.domain import Filter
 from clean_python.base.domain import Id
 from clean_python.base.domain import Json
@@ -17,7 +20,6 @@ from clean_python.base.domain import RootEntity
 
 T = TypeVar("T", bound=RootEntity)
 
-
 __all__ = ["Manage"]
 
 
@@ -42,7 +44,24 @@ class Manage(Generic[T]):
     async def create(self, values: Json) -> T:
         return await self.repo.add(values)
 
-    async def update(self, id: Id, values: Json) -> T:
+    async def update(self, id: Id, values: Json, retry_on_conflict: bool = True) -> T:
+        """This update has a built-in retry function that can be switched off.
+
+        This because some gateways (SQLGateway, ApiGateway) may raise Conflict
+        errors in case there are concurrency issues. The backoff strategy assumes that
+        we can retry immediately (because the conflict is gone immediately), but it
+        does add some jitter between 0 and 200 ms to avoid many competing processes.
+
+        If the repo.update is not idempotent (which is atypical), retries should be
+        switched off.
+        """
+        if retry_on_conflict:
+            return await self._update_with_retries(id, values)
+        else:
+            return await self.repo.update(id, values)
+
+    @backoff.on_exception(backoff.constant, Conflict, max_tries=10, interval=0.2)
+    async def _update_with_retries(self, id: Id, values: Json) -> T:
         return await self.repo.update(id, values)
 
     async def destroy(self, id: Id) -> bool:

+ 1 - 1
clean_python/base/domain/exceptions.py

@@ -41,7 +41,7 @@ class Conflict(Exception):
         super().__init__(msg)
 
 
-class AlreadyExists(Conflict):
+class AlreadyExists(Exception):
     def __init__(self, value: Any = None, key: str = "id"):
         super().__init__(f"record with {key}={value} already exists")
 

+ 1 - 1
pyproject.toml

@@ -10,7 +10,7 @@ license = {text = "MIT"}
 classifiers = ["Programming Language :: Python"]
 keywords = []
 requires-python = ">=3.7"
-dependencies = ["pydantic==2.*", "inject==4.*", "asgiref", "blinker", "async-lru"]
+dependencies = ["pydantic==2.*", "inject==4.*", "asgiref", "blinker", "async-lru", "backoff"]
 dynamic = ["version"]
 
 [project.optional-dependencies]

+ 20 - 2
tests/test_manage.py

@@ -4,6 +4,7 @@ from unittest import mock
 
 import pytest
 
+from clean_python import Conflict
 from clean_python import Filter
 from clean_python import Manage
 from clean_python import RootEntity
@@ -39,8 +40,6 @@ async def test_create(manage_user):
 
 
 async def test_update(manage_user):
-    manage_user.repo.get.return_value = User.create(id=2, name="piet")
-
     result = await manage_user.update(2, {"name": "jan"})
 
     manage_user.repo.update.assert_awaited_once_with(2, {"name": "jan"})
@@ -94,3 +93,22 @@ async def test_exists(manage_user):
     manage_user.repo.exists.assert_awaited_once_with(filters)
 
     assert result is manage_user.repo.exists.return_value
+
+
+@pytest.mark.parametrize("failure_count", [1, 2])
+async def test_update_retry_on_conflict(manage_user, failure_count: int):
+    manage_user.repo.update.side_effect = (Conflict,) * failure_count + (
+        {"name": "foo"},
+    )
+
+    result = await manage_user.update(2, {"name": "jan"})
+
+    assert manage_user.repo.update.call_count == failure_count + 1
+    assert result == {"name": "foo"}
+
+
+async def test_update_retry_on_conflict_opt_out(manage_user):
+    manage_user.repo.update.side_effect = (Conflict, {"name": "foo"})
+
+    with pytest.raises(Conflict):
+        await manage_user.update(2, {"name": "jan"}, retry_on_conflict=False)