test_dramatiq_task_logger.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. import os
  2. from unittest import mock
  3. from uuid import uuid4
  4. import pytest
  5. from dramatiq.errors import Retry
  6. from dramatiq.message import Message
  7. from clean_python import ctx
  8. from clean_python import InMemoryGateway
  9. from clean_python.dramatiq import DramatiqTaskLogger
  10. @pytest.fixture
  11. def in_memory_gateway():
  12. return InMemoryGateway(data=[])
  13. @pytest.fixture
  14. def task_logger(in_memory_gateway):
  15. return DramatiqTaskLogger(
  16. hostname="host",
  17. gateway_override=in_memory_gateway,
  18. )
  19. @pytest.fixture
  20. def correlation_id():
  21. uid = uuid4()
  22. ctx.correlation_id = uid
  23. yield uid
  24. ctx.correlation_id = None
  25. @pytest.fixture
  26. def patched_time():
  27. with mock.patch("time.time", side_effect=(0, 123.456)):
  28. yield
  29. @pytest.fixture
  30. def message():
  31. return Message(
  32. queue_name="default",
  33. actor_name="my_task",
  34. args=(1, 2),
  35. kwargs={"foo": "bar"},
  36. options={},
  37. message_id="abc123",
  38. message_timestamp=None,
  39. )
  40. @pytest.fixture
  41. def expected(correlation_id):
  42. return {
  43. "id": 1,
  44. "tag_suffix": "task_log",
  45. "task_id": "abc123",
  46. "name": "my_task",
  47. "state": "SUCCESS",
  48. "duration": 123.456,
  49. "retries": 0,
  50. "origin": f"host-{os.getpid()}",
  51. "argsrepr": b"[1,2]",
  52. "kwargsrepr": b'{"foo":"bar"}',
  53. "result": None,
  54. "time": 0.0,
  55. "correlation_id": str(correlation_id),
  56. }
  57. async def test_log_success(
  58. patched_time, task_logger, in_memory_gateway, message, expected
  59. ):
  60. await task_logger.start()
  61. await task_logger.stop(message)
  62. assert in_memory_gateway.data[1] == expected
  63. async def test_log_fail(
  64. patched_time, task_logger, in_memory_gateway, message, expected
  65. ):
  66. await task_logger.start()
  67. await task_logger.stop(message, exception=ValueError("test"))
  68. assert in_memory_gateway.data[1] == {
  69. **expected,
  70. "state": "FAILURE",
  71. "result": None,
  72. }
  73. async def test_log_retry(
  74. patched_time, task_logger, in_memory_gateway, message, expected
  75. ):
  76. await task_logger.start()
  77. await task_logger.stop(message, exception=Retry("test"))
  78. assert in_memory_gateway.data[1] == {
  79. **expected,
  80. "state": "RETRY",
  81. }