test_celery_task_logger.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. from unittest import mock
  2. from uuid import uuid4
  3. import pytest
  4. from celery import Task
  5. from clean_python import InMemorySyncGateway
  6. from clean_python.celery import CeleryTaskLogger
  7. @pytest.fixture
  8. def celery_task_logger() -> CeleryTaskLogger:
  9. return CeleryTaskLogger(InMemorySyncGateway([]))
  10. def test_log_minimal(celery_task_logger: CeleryTaskLogger):
  11. celery_task_logger.stop(Task(), "STAAT")
  12. (entry,) = celery_task_logger.gateway.filter([])
  13. assert entry == {
  14. "id": 1,
  15. "tag_suffix": "task_log",
  16. "task_id": None,
  17. "name": None,
  18. "state": "STAAT",
  19. "duration": None,
  20. "origin": None,
  21. "argsrepr": None,
  22. "kwargsrepr": None,
  23. "result": None,
  24. "time": None,
  25. "tenant_id": None,
  26. "correlation_id": None,
  27. "retries": None,
  28. }
  29. def test_log_with_duration(celery_task_logger: CeleryTaskLogger):
  30. with mock.patch("time.time", return_value=1.0):
  31. celery_task_logger.start()
  32. with mock.patch("time.time", return_value=100.0):
  33. celery_task_logger.stop(Task(), "STAAT")
  34. (entry,) = celery_task_logger.gateway.filter([])
  35. assert entry["time"] == 1.0
  36. assert entry["duration"] == 99.0
  37. @pytest.fixture
  38. def celery_task():
  39. # it seems impossible to instantiate a true celery Task object...
  40. request = mock.Mock()
  41. request.id = "abc123"
  42. request.origin = "hostname"
  43. request.retries = 25
  44. request.args = [1, 2]
  45. request.kwargs = {
  46. "clean_python_context": {
  47. "tenant": {"id": 15, "name": "foo"},
  48. "correlation_id": "b3089ea7-2585-43e5-a63c-ae30a6e9b5e4",
  49. }
  50. }
  51. task = mock.Mock()
  52. task.name = "task_name"
  53. task.request = request
  54. return task
  55. def test_log_with_request(celery_task_logger: CeleryTaskLogger, celery_task):
  56. celery_task_logger.stop(celery_task, "STAAT")
  57. (entry,) = celery_task_logger.gateway.filter([])
  58. assert entry["name"] == "task_name"
  59. assert entry["task_id"] == "abc123"
  60. assert entry["retries"] == 25
  61. assert entry["argsrepr"] == "[1, 2]"
  62. assert entry["kwargsrepr"] == "{}"
  63. assert entry["origin"] == "hostname"
  64. assert entry["tenant_id"] == 15
  65. assert entry["correlation_id"] == "b3089ea7-2585-43e5-a63c-ae30a6e9b5e4"
  66. @pytest.mark.parametrize(
  67. "result,expected",
  68. [
  69. ({"a": "b"}, {"a": "b"}),
  70. ("str", {"result": "str"}), # str to dict
  71. ([1], {"result": [1]}), # list to dict
  72. ({"a": uuid4()}, None), # not-json-serializable
  73. ],
  74. )
  75. def test_log_with_result(
  76. celery_task_logger: CeleryTaskLogger, celery_task, result, expected
  77. ):
  78. celery_task_logger.stop(celery_task, "STAAT", result=result)
  79. (entry,) = celery_task_logger.gateway.filter([])
  80. assert entry["result"] == expected
  81. def test_log_with_request_no_args_kwargs(
  82. celery_task_logger: CeleryTaskLogger, celery_task
  83. ):
  84. celery_task.request.args = None
  85. celery_task.request.kwargs = None
  86. celery_task_logger.stop(celery_task, "STAAT")
  87. (entry,) = celery_task_logger.gateway.filter([])
  88. assert entry["argsrepr"] is None
  89. assert entry["kwargsrepr"] is None
  90. assert entry["tenant_id"] is None
  91. assert entry["correlation_id"] is None