| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 | from unittest import mockfrom uuid import uuid4import pytestfrom celery import Taskfrom clean_python import InMemorySyncGatewayfrom clean_python.celery import CeleryTaskLogger@pytest.fixturedef celery_task_logger() -> CeleryTaskLogger:    return CeleryTaskLogger(InMemorySyncGateway([]))def test_log_minimal(celery_task_logger: CeleryTaskLogger):    celery_task_logger.stop(Task(), "STAAT")    (entry,) = celery_task_logger.gateway.filter([])    assert entry == {        "id": 1,        "tag_suffix": "task_log",        "task_id": None,        "name": None,        "state": "STAAT",        "duration": None,        "origin": None,        "argsrepr": None,        "kwargsrepr": None,        "result": None,        "time": None,        "tenant_id": None,        "correlation_id": None,        "retries": None,    }def test_log_with_duration(celery_task_logger: CeleryTaskLogger):    with mock.patch("time.time", return_value=1.0):        celery_task_logger.start()    with mock.patch("time.time", return_value=100.0):        celery_task_logger.stop(Task(), "STAAT")    (entry,) = celery_task_logger.gateway.filter([])    assert entry["time"] == 1.0    assert entry["duration"] == 99.0@pytest.fixturedef celery_task():    # it seems impossible to instantiate a true celery Task object...    request = mock.Mock()    request.id = "abc123"    request.origin = "hostname"    request.retries = 25    request.args = [1, 2]    request.kwargs = {        "clean_python_context": {            "tenant": {"id": 15, "name": "foo"},            "correlation_id": "b3089ea7-2585-43e5-a63c-ae30a6e9b5e4",        }    }    task = mock.Mock()    task.name = "task_name"    task.request = request    return taskdef test_log_with_request(celery_task_logger: CeleryTaskLogger, celery_task):    celery_task_logger.stop(celery_task, "STAAT")    (entry,) = celery_task_logger.gateway.filter([])    assert entry["name"] == "task_name"    assert entry["task_id"] == "abc123"    assert entry["retries"] == 25    assert entry["argsrepr"] == "[1, 2]"    assert entry["kwargsrepr"] == "{}"    assert entry["origin"] == "hostname"    assert entry["tenant_id"] == 15    assert entry["correlation_id"] == "b3089ea7-2585-43e5-a63c-ae30a6e9b5e4"@pytest.mark.parametrize(    "result,expected",    [        ({"a": "b"}, {"a": "b"}),        ("str", {"result": "str"}),  # str to dict        ([1], {"result": [1]}),  # list to dict        ({"a": uuid4()}, None),  # not-json-serializable    ],)def test_log_with_result(    celery_task_logger: CeleryTaskLogger, celery_task, result, expected):    celery_task_logger.stop(celery_task, "STAAT", result=result)    (entry,) = celery_task_logger.gateway.filter([])    assert entry["result"] == expecteddef test_log_with_request_no_args_kwargs(    celery_task_logger: CeleryTaskLogger, celery_task):    celery_task.request.args = None    celery_task.request.kwargs = None    celery_task_logger.stop(celery_task, "STAAT")    (entry,) = celery_task_logger.gateway.filter([])    assert entry["argsrepr"] is None    assert entry["kwargsrepr"] is None    assert entry["tenant_id"] is None    assert entry["correlation_id"] is None
 |