| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 | from unittest import mockfrom uuid import uuid4import pytestfrom celery import Taskfrom clean_python import InMemorySyncGatewayfrom clean_python.celery import CeleryTaskLogger@pytest.fixturedef celery_task_logger() -> CeleryTaskLogger:    return CeleryTaskLogger(InMemorySyncGateway([]))def test_log_minimal(celery_task_logger: CeleryTaskLogger):    celery_task_logger.stop(Task(), "STAAT")    (entry,) = celery_task_logger.gateway.filter([])    assert entry == {        "id": 1,        "tag_suffix": "task_log",        "task_id": None,        "name": None,        "state": "STAAT",        "duration": None,        "origin": None,        "args": None,        "kwargs": None,        "result": None,        "time": None,        "tenant_id": None,        "correlation_id": None,        "retries": None,    }def test_log_with_duration(celery_task_logger: CeleryTaskLogger):    with mock.patch("time.time", return_value=1.0):        celery_task_logger.start()    with mock.patch("time.time", return_value=100.0):        celery_task_logger.stop(Task(), "STAAT")    (entry,) = celery_task_logger.gateway.filter([])    assert entry["time"] == 1.0    assert entry["duration"] == 99.0@pytest.fixturedef celery_task():    # it seems impossible to instantiate a true celery Task object...    request = mock.Mock()    request.id = "abc123"    request.origin = "hostname"    request.retries = 25    request.args = [1, 2]    request.kwargs = {        "clean_python_context": {            "tenant": None,            "correlation_id": "b3089ea7-2585-43e5-a63c-ae30a6e9b5e4",        }    }    task = mock.Mock()    task.name = "task_name"    task.request = request    return taskdef test_log_with_request(celery_task_logger: CeleryTaskLogger, celery_task):    celery_task_logger.stop(celery_task, "STAAT")    (entry,) = celery_task_logger.gateway.filter([])    assert entry["name"] == "task_name"    assert entry["task_id"] == "abc123"    assert entry["retries"] == 25    assert entry["args"] == [1, 2]    assert entry["kwargs"] == {}    assert entry["origin"] == "hostname"    assert entry["correlation_id"] == "b3089ea7-2585-43e5-a63c-ae30a6e9b5e4"@pytest.mark.parametrize(    "result,expected",    [        ({"a": "b"}, {"a": "b"}),        ("str", {"result": "str"}),  # str to dict        ([1], {"result": [1]}),  # list to dict        ({"a": uuid4()}, None),  # not-json-serializable    ],)def test_log_with_result(    celery_task_logger: CeleryTaskLogger, celery_task, result, expected):    celery_task_logger.stop(celery_task, "STAAT", result=result)    (entry,) = celery_task_logger.gateway.filter([])    assert entry["result"] == expected
 |