test_dramatiq_task_logger.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. import os
  2. from unittest import mock
  3. import pytest
  4. from dramatiq.errors import Retry
  5. from dramatiq.message import Message
  6. from clean_python import InMemoryGateway
  7. from clean_python.dramatiq import DramatiqTaskLogger
  8. @pytest.fixture
  9. def in_memory_gateway():
  10. return InMemoryGateway(data=[])
  11. @pytest.fixture
  12. def task_logger(in_memory_gateway):
  13. return DramatiqTaskLogger(
  14. hostname="host",
  15. gateway_override=in_memory_gateway,
  16. )
  17. @pytest.fixture
  18. def message():
  19. return Message(
  20. queue_name="default",
  21. actor_name="my_task",
  22. args=(1, 2),
  23. kwargs={"foo": "bar"},
  24. options={},
  25. message_id="abc123",
  26. message_timestamp=None,
  27. )
  28. @pytest.fixture
  29. def expected():
  30. return {
  31. "id": 1,
  32. "tag_suffix": "task_log",
  33. "task_id": "abc123",
  34. "name": "my_task",
  35. "state": "SUCCESS",
  36. "duration": 0,
  37. "retries": 0,
  38. "origin": f"host-{os.getpid()}",
  39. "argsrepr": b"[1,2]",
  40. "kwargsrepr": b'{"foo":"bar"}',
  41. "result": None,
  42. }
  43. @mock.patch("time.time", return_value=123)
  44. async def test_log_success(time, task_logger, in_memory_gateway, message, expected):
  45. await task_logger.start()
  46. await task_logger.stop(message)
  47. assert in_memory_gateway.data[1] == expected
  48. @mock.patch("time.time", new=mock.Mock(return_value=123))
  49. async def test_log_fail(task_logger, in_memory_gateway, message, expected):
  50. await task_logger.start()
  51. await task_logger.stop(message, exception=ValueError("test"))
  52. assert in_memory_gateway.data[1] == {
  53. **expected,
  54. "state": "FAILURE",
  55. "result": None,
  56. }
  57. @mock.patch("time.time", return_value=123)
  58. async def test_log_retry(time, task_logger, in_memory_gateway, message, expected):
  59. await task_logger.start()
  60. await task_logger.stop(message, exception=Retry("test"))
  61. assert in_memory_gateway.data[1] == {
  62. **expected,
  63. "state": "RETRY",
  64. }