test_celery_task_logger.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. from unittest import mock
  2. from uuid import uuid4
  3. import pytest
  4. from celery import Task
  5. from clean_python import InMemorySyncGateway
  6. from clean_python.celery import CeleryTaskLogger
  7. @pytest.fixture
  8. def celery_task_logger() -> CeleryTaskLogger:
  9. return CeleryTaskLogger(InMemorySyncGateway([]))
  10. def test_log_minimal(celery_task_logger: CeleryTaskLogger):
  11. celery_task_logger.stop(Task(), "STAAT")
  12. (entry,) = celery_task_logger.gateway.filter([])
  13. assert entry == {
  14. "id": 1,
  15. "tag_suffix": "task_log",
  16. "task_id": None,
  17. "name": None,
  18. "state": "STAAT",
  19. "duration": None,
  20. "origin": None,
  21. "argsrepr": None,
  22. "kwargsrepr": None,
  23. "result": None,
  24. "time": None,
  25. "tenant_id": None,
  26. "correlation_id": None,
  27. "retries": None,
  28. }
  29. def test_log_with_duration(celery_task_logger: CeleryTaskLogger):
  30. with mock.patch("time.time", return_value=1.0):
  31. celery_task_logger.start()
  32. with mock.patch("time.time", return_value=100.0):
  33. celery_task_logger.stop(Task(), "STAAT")
  34. (entry,) = celery_task_logger.gateway.filter([])
  35. assert entry["time"] == 1.0
  36. assert entry["duration"] == 99.0
  37. @pytest.fixture
  38. def celery_task():
  39. # it seems impossible to instantiate a true celery Task object...
  40. request = mock.Mock()
  41. request.id = "abc123"
  42. request.origin = "hostname"
  43. request.retries = 25
  44. request.args = [1, 2]
  45. request.kwargs = {
  46. "clean_python_context": {
  47. "tenant": None,
  48. "correlation_id": "b3089ea7-2585-43e5-a63c-ae30a6e9b5e4",
  49. }
  50. }
  51. task = mock.Mock()
  52. task.name = "task_name"
  53. task.request = request
  54. return task
  55. def test_log_with_request(celery_task_logger: CeleryTaskLogger, celery_task):
  56. celery_task_logger.stop(celery_task, "STAAT")
  57. (entry,) = celery_task_logger.gateway.filter([])
  58. assert entry["name"] == "task_name"
  59. assert entry["task_id"] == "abc123"
  60. assert entry["retries"] == 25
  61. assert entry["argsrepr"] == "[1, 2]"
  62. assert entry["kwargsrepr"] == "{}"
  63. assert entry["origin"] == "hostname"
  64. assert entry["correlation_id"] == "b3089ea7-2585-43e5-a63c-ae30a6e9b5e4"
  65. @pytest.mark.parametrize(
  66. "result,expected",
  67. [
  68. ({"a": "b"}, {"a": "b"}),
  69. ("str", {"result": "str"}), # str to dict
  70. ([1], {"result": [1]}), # list to dict
  71. ({"a": uuid4()}, None), # not-json-serializable
  72. ],
  73. )
  74. def test_log_with_result(
  75. celery_task_logger: CeleryTaskLogger, celery_task, result, expected
  76. ):
  77. celery_task_logger.stop(celery_task, "STAAT", result=result)
  78. (entry,) = celery_task_logger.gateway.filter([])
  79. assert entry["result"] == expected