| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 | import osfrom unittest import mockfrom uuid import uuid4import pytestfrom dramatiq.errors import Retryfrom dramatiq.message import Messagefrom clean_python import ctxfrom clean_python import InMemoryGatewayfrom clean_python.dramatiq import DramatiqTaskLogger@pytest.fixturedef in_memory_gateway():    return InMemoryGateway(data=[])@pytest.fixturedef task_logger(in_memory_gateway):    return DramatiqTaskLogger(        hostname="host",        gateway_override=in_memory_gateway,    )@pytest.fixturedef correlation_id():    uid = uuid4()    ctx.correlation_id = uid    yield uid    ctx.correlation_id = None@pytest.fixturedef patched_time():    with mock.patch("time.time", side_effect=(0, 123.456)):        yield@pytest.fixturedef message():    return Message(        queue_name="default",        actor_name="my_task",        args=(1, 2),        kwargs={"foo": "bar"},        options={},        message_id="abc123",        message_timestamp=None,    )@pytest.fixturedef expected(correlation_id):    return {        "id": 1,        "tag_suffix": "task_log",        "task_id": "abc123",        "name": "my_task",        "state": "SUCCESS",        "duration": 123.456,        "retries": 0,        "origin": f"host-{os.getpid()}",        "argsrepr": b"[1,2]",        "kwargsrepr": b'{"foo":"bar"}',        "result": None,        "time": 0.0,        "correlation_id": str(correlation_id),    }async def test_log_success(    patched_time, task_logger, in_memory_gateway, message, expected):    await task_logger.start()    await task_logger.stop(message)    assert in_memory_gateway.data[1] == expectedasync def test_log_fail(    patched_time, task_logger, in_memory_gateway, message, expected):    await task_logger.start()    await task_logger.stop(message, exception=ValueError("test"))    assert in_memory_gateway.data[1] == {        **expected,        "state": "FAILURE",        "result": None,    }async def test_log_retry(    patched_time, task_logger, in_memory_gateway, message, expected):    await task_logger.start()    await task_logger.stop(message, exception=Retry("test"))    assert in_memory_gateway.data[1] == {        **expected,        "state": "RETRY",    }
 |