12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- from unittest import mock
- from uuid import uuid4
- import pytest
- from celery import Task
- from clean_python import InMemorySyncGateway
- from clean_python.celery import CeleryTaskLogger
- @pytest.fixture
- def celery_task_logger() -> CeleryTaskLogger:
- return CeleryTaskLogger(InMemorySyncGateway([]))
- def test_log_minimal(celery_task_logger: CeleryTaskLogger):
- celery_task_logger.stop(Task(), "STAAT")
- (entry,) = celery_task_logger.gateway.filter([])
- assert entry == {
- "id": 1,
- "tag_suffix": "task_log",
- "task_id": None,
- "name": None,
- "state": "STAAT",
- "duration": None,
- "origin": None,
- "argsrepr": None,
- "kwargsrepr": None,
- "result": None,
- "time": None,
- "tenant_id": None,
- "correlation_id": None,
- "retries": None,
- }
- def test_log_with_duration(celery_task_logger: CeleryTaskLogger):
- with mock.patch("time.time", return_value=1.0):
- celery_task_logger.start()
- with mock.patch("time.time", return_value=100.0):
- celery_task_logger.stop(Task(), "STAAT")
- (entry,) = celery_task_logger.gateway.filter([])
- assert entry["time"] == 1.0
- assert entry["duration"] == 99.0
- @pytest.fixture
- def celery_task():
- # it seems impossible to instantiate a true celery Task object...
- request = mock.Mock()
- request.id = "abc123"
- request.origin = "hostname"
- request.retries = 25
- request.args = [1, 2]
- request.kwargs = {
- "clean_python_context": {
- "tenant": None,
- "correlation_id": "b3089ea7-2585-43e5-a63c-ae30a6e9b5e4",
- }
- }
- task = mock.Mock()
- task.name = "task_name"
- task.request = request
- return task
- def test_log_with_request(celery_task_logger: CeleryTaskLogger, celery_task):
- celery_task_logger.stop(celery_task, "STAAT")
- (entry,) = celery_task_logger.gateway.filter([])
- assert entry["name"] == "task_name"
- assert entry["task_id"] == "abc123"
- assert entry["retries"] == 25
- assert entry["argsrepr"] == "[1, 2]"
- assert entry["kwargsrepr"] == "{}"
- assert entry["origin"] == "hostname"
- assert entry["correlation_id"] == "b3089ea7-2585-43e5-a63c-ae30a6e9b5e4"
- @pytest.mark.parametrize(
- "result,expected",
- [
- ({"a": "b"}, {"a": "b"}),
- ("str", {"result": "str"}), # str to dict
- ([1], {"result": [1]}), # list to dict
- ({"a": uuid4()}, None), # not-json-serializable
- ],
- )
- def test_log_with_result(
- celery_task_logger: CeleryTaskLogger, celery_task, result, expected
- ):
- celery_task_logger.stop(celery_task, "STAAT", result=result)
- (entry,) = celery_task_logger.gateway.filter([])
- assert entry["result"] == expected
|