123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- from unittest import mock
- from uuid import uuid4
- import pytest
- from celery import Task
- from clean_python import InMemorySyncGateway
- from clean_python.celery import CeleryTaskLogger
- @pytest.fixture
- def celery_task_logger() -> CeleryTaskLogger:
- return CeleryTaskLogger(InMemorySyncGateway([]))
- def test_log_minimal(celery_task_logger: CeleryTaskLogger):
- celery_task_logger.stop(Task(), "STAAT")
- (entry,) = celery_task_logger.gateway.filter([])
- assert entry == {
- "id": 1,
- "tag_suffix": "task_log",
- "task_id": None,
- "name": None,
- "state": "STAAT",
- "duration": None,
- "origin": None,
- "argsrepr": None,
- "kwargsrepr": None,
- "result": None,
- "time": None,
- "tenant_id": None,
- "correlation_id": None,
- "retries": None,
- }
- def test_log_with_duration(celery_task_logger: CeleryTaskLogger):
- with mock.patch("time.time", return_value=1.0):
- celery_task_logger.start()
- with mock.patch("time.time", return_value=100.0):
- celery_task_logger.stop(Task(), "STAAT")
- (entry,) = celery_task_logger.gateway.filter([])
- assert entry["time"] == 1.0
- assert entry["duration"] == 99.0
- @pytest.fixture
- def celery_task():
- # it seems impossible to instantiate a true celery Task object...
- request = mock.Mock()
- request.id = "abc123"
- request.origin = "hostname"
- request.retries = 25
- request.args = [1, 2]
- request.kwargs = {
- "clean_python_context": {
- "tenant": {"id": 15, "name": "foo"},
- "correlation_id": "b3089ea7-2585-43e5-a63c-ae30a6e9b5e4",
- }
- }
- task = mock.Mock()
- task.name = "task_name"
- task.request = request
- return task
- def test_log_with_request(celery_task_logger: CeleryTaskLogger, celery_task):
- celery_task_logger.stop(celery_task, "STAAT")
- (entry,) = celery_task_logger.gateway.filter([])
- assert entry["name"] == "task_name"
- assert entry["task_id"] == "abc123"
- assert entry["retries"] == 25
- assert entry["argsrepr"] == "[1, 2]"
- assert entry["kwargsrepr"] == "{}"
- assert entry["origin"] == "hostname"
- assert entry["tenant_id"] == 15
- assert entry["correlation_id"] == "b3089ea7-2585-43e5-a63c-ae30a6e9b5e4"
- @pytest.mark.parametrize(
- "result,expected",
- [
- ({"a": "b"}, {"a": "b"}),
- ("str", {"result": "str"}), # str to dict
- ([1], {"result": [1]}), # list to dict
- ({"a": uuid4()}, None), # not-json-serializable
- ],
- )
- def test_log_with_result(
- celery_task_logger: CeleryTaskLogger, celery_task, result, expected
- ):
- celery_task_logger.stop(celery_task, "STAAT", result=result)
- (entry,) = celery_task_logger.gateway.filter([])
- assert entry["result"] == expected
- def test_log_with_request_no_args_kwargs(
- celery_task_logger: CeleryTaskLogger, celery_task
- ):
- celery_task.request.args = None
- celery_task.request.kwargs = None
- celery_task_logger.stop(celery_task, "STAAT")
- (entry,) = celery_task_logger.gateway.filter([])
- assert entry["argsrepr"] is None
- assert entry["kwargsrepr"] is None
- assert entry["tenant_id"] is None
- assert entry["correlation_id"] is None
|