test_manage.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. # (c) Nelen & Schuurmans
  2. from unittest import mock
  3. import pytest
  4. from clean_python import Conflict
  5. from clean_python import Filter
  6. from clean_python import Manage
  7. from clean_python import RootEntity
  8. class User(RootEntity):
  9. name: str
  10. class ManageUser(Manage[User]):
  11. def __init__(self):
  12. self.repo = mock.AsyncMock()
  13. @pytest.fixture
  14. def manage_user():
  15. return ManageUser()
  16. async def test_retrieve(manage_user):
  17. result = await manage_user.retrieve(2)
  18. manage_user.repo.get.assert_awaited_with(2)
  19. assert result is manage_user.repo.get.return_value
  20. async def test_create(manage_user):
  21. result = await manage_user.create({"name": "piet"})
  22. manage_user.repo.add.assert_awaited_once_with({"name": "piet"})
  23. assert result is manage_user.repo.add.return_value
  24. async def test_update(manage_user):
  25. result = await manage_user.update(2, {"name": "jan"})
  26. manage_user.repo.update.assert_awaited_once_with(2, {"name": "jan"})
  27. assert result is manage_user.repo.update.return_value
  28. async def test_destroy(manage_user):
  29. result = await manage_user.destroy(2)
  30. manage_user.repo.remove.assert_awaited_with(2)
  31. assert result is manage_user.repo.remove.return_value
  32. async def test_list(manage_user):
  33. result = await manage_user.list()
  34. manage_user.repo.all.assert_awaited_once()
  35. assert result is manage_user.repo.all.return_value
  36. async def test_by(manage_user):
  37. result = await manage_user.by("name", "piet")
  38. manage_user.repo.by.assert_awaited_with("name", "piet", params=None)
  39. assert result is manage_user.repo.by.return_value
  40. async def test_filter(manage_user):
  41. filters = [Filter(field="x", values=[1])]
  42. result = await manage_user.filter(filters)
  43. manage_user.repo.filter.assert_awaited_once_with(filters, params=None)
  44. assert result is manage_user.repo.filter.return_value
  45. async def test_count(manage_user):
  46. filters = [Filter(field="x", values=[1])]
  47. result = await manage_user.count(filters)
  48. manage_user.repo.count.assert_awaited_once_with(filters)
  49. assert result is manage_user.repo.count.return_value
  50. async def test_exists(manage_user):
  51. filters = [Filter(field="x", values=[1])]
  52. result = await manage_user.exists(filters)
  53. manage_user.repo.exists.assert_awaited_once_with(filters)
  54. assert result is manage_user.repo.exists.return_value
  55. @pytest.mark.parametrize("failure_count", [1, 2])
  56. async def test_update_retry_on_conflict(manage_user, failure_count: int):
  57. manage_user.repo.update.side_effect = (Conflict,) * failure_count + (
  58. {"name": "foo"},
  59. )
  60. result = await manage_user.update(2, {"name": "jan"})
  61. assert manage_user.repo.update.call_count == failure_count + 1
  62. assert result == {"name": "foo"}
  63. async def test_update_retry_on_conflict_opt_out(manage_user):
  64. manage_user.repo.update.side_effect = (Conflict, {"name": "foo"})
  65. with pytest.raises(Conflict):
  66. await manage_user.update(2, {"name": "jan"}, retry_on_conflict=False)