test_celery_base_task.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. from unittest import mock
  2. from uuid import UUID
  3. from uuid import uuid4
  4. import pytest
  5. from celery import Task
  6. from clean_python import ctx
  7. from clean_python import Tenant
  8. from clean_python.celery import BaseTask
  9. from clean_python.celery.base_task import HEADER_FIELD
  10. @pytest.fixture
  11. def mocked_apply_async():
  12. with mock.patch.object(Task, "apply_async") as m:
  13. yield m
  14. @pytest.fixture
  15. def temp_context():
  16. ctx.tenant = Tenant(id=2, name="test")
  17. ctx.correlation_id = uuid4()
  18. yield ctx
  19. ctx.tenant = None
  20. ctx.correlation_id = None
  21. @mock.patch(
  22. "clean_python.celery.base_task.uuid4",
  23. return_value=UUID("479156af-a302-48fc-89ed-8c426abadc4c"),
  24. )
  25. def test_apply_async(uuid4, mocked_apply_async):
  26. BaseTask().apply_async(args=("foo",), kwargs={"a": "bar"})
  27. assert mocked_apply_async.call_count == 1
  28. (args, kwargs), _ = mocked_apply_async.call_args
  29. assert args == ("foo",)
  30. assert kwargs["a"] == "bar"
  31. assert kwargs[HEADER_FIELD] == {
  32. "tenant": None,
  33. "correlation_id": "479156af-a302-48fc-89ed-8c426abadc4c",
  34. }
  35. def test_apply_async_with_context(mocked_apply_async, temp_context):
  36. BaseTask().apply_async(args=("foo",), kwargs={"a": "bar"})
  37. assert mocked_apply_async.call_count == 1
  38. (_, kwargs), _ = mocked_apply_async.call_args
  39. assert kwargs["a"] == "bar"
  40. assert kwargs[HEADER_FIELD]["tenant"] == temp_context.tenant.model_dump(mode="json")
  41. assert kwargs[HEADER_FIELD]["correlation_id"] == str(temp_context.correlation_id)