| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970 | from unittest import mockfrom uuid import UUIDfrom uuid import uuid4import pytestfrom celery import Taskfrom clean_python import ctxfrom clean_python import Tenantfrom clean_python.celery import BaseTaskfrom clean_python.celery.base_task import HEADER_FIELD@pytest.fixturedef mocked_apply_async():    with mock.patch.object(Task, "apply_async") as m:        yield m@pytest.fixturedef temp_context():    ctx.tenant = Tenant(id=2, name="test")    ctx.correlation_id = uuid4()    yield ctx    ctx.tenant = None    ctx.correlation_id = Nonedef test_apply_async(mocked_apply_async):    BaseTask().apply_async(args="foo", kwargs="bar")    assert mocked_apply_async.call_count == 1    args, kwargs = mocked_apply_async.call_args    assert args == ("foo", "bar")    assert kwargs["headers"][HEADER_FIELD]["tenant"] is None    UUID(kwargs["headers"][HEADER_FIELD]["correlation_id"])  # generateddef test_apply_async_with_context(mocked_apply_async, temp_context):    BaseTask().apply_async(args="foo", kwargs="bar")    assert mocked_apply_async.call_count == 1    _, kwargs = mocked_apply_async.call_args    assert kwargs["headers"][HEADER_FIELD]["tenant"] == temp_context.tenant.model_dump(        mode="json"    )    kwargs["headers"][HEADER_FIELD]["correlation_id"] == str(        temp_context.correlation_id    )def test_apply_async_headers_extended(mocked_apply_async):    headers = {"baz": 2}    BaseTask().apply_async(args="foo", kwargs="bar", headers=headers)    assert mocked_apply_async.call_count == 1    _, kwargs = mocked_apply_async.call_args    assert kwargs["headers"]["baz"] == 2    assert kwargs["headers"][HEADER_FIELD]["tenant"] is None    UUID(kwargs["headers"][HEADER_FIELD]["correlation_id"])  # generated    assert headers == {"baz": 2}  # not changed inplacedef test_apply_async_headers_already_present(mocked_apply_async):    BaseTask().apply_async(args="foo", kwargs="bar", headers={HEADER_FIELD: "foo"})    assert mocked_apply_async.call_count == 1    _, kwargs = mocked_apply_async.call_args    assert kwargs["headers"] == {HEADER_FIELD: "foo"}
 |