| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228 | # -*- coding: utf-8 -*-# (c) Nelen & Schuurmansimport iofrom datetime import datetimefrom unittest import mockimport boto3import pytestfrom botocore.exceptions import ClientErrorfrom clean_python import DoesNotExistfrom clean_python import Filterfrom clean_python import PageOptionsfrom clean_python.s3 import S3BucketOptionsfrom clean_python.s3 import S3BucketProviderfrom clean_python.s3 import S3Gateway@pytest.fixture(scope="session")def s3_settings(s3_url):    minio_settings = {        "url": s3_url,        "access_key": "cleanpython",        "secret_key": "cleanpython",        "bucket": "cleanpython-test",        "region": None,    }    if not minio_settings["bucket"].endswith("-test"):  # type: ignore        pytest.exit("Not running against a test minio bucket?! 😱")    return minio_settings.copy()@pytest.fixture(scope="session")def s3_bucket(s3_settings):    s3 = boto3.resource(        "s3",        endpoint_url=s3_settings["url"],        aws_access_key_id=s3_settings["access_key"],        aws_secret_access_key=s3_settings["secret_key"],    )    bucket = s3.Bucket(s3_settings["bucket"])    # ensure existence    try:        bucket.create()    except ClientError as e:        if "BucketAlreadyOwnedByYou" in str(e):            pass    return bucket@pytest.fixturedef s3_provider(s3_bucket, s3_settings):    # wipe contents before each test    s3_bucket.objects.all().delete()    return S3BucketProvider(S3BucketOptions(**s3_settings))@pytest.fixturedef s3_gateway(s3_provider):    return S3Gateway(s3_provider)@pytest.fixturedef object_in_s3(s3_bucket):    s3_bucket.upload_fileobj(io.BytesIO(b"foo"), "object-in-s3")    return "object-in-s3"@pytest.fixturedef local_file(tmp_path):    path = tmp_path / "test-upload.txt"    path.write_bytes(b"foo")    return pathasync def test_upload_file(s3_gateway: S3Gateway, local_file):    object_name = "test-upload-file"    await s3_gateway.upload_file(object_name, local_file)    assert (await s3_gateway.get(object_name))["size"] == 3async def test_upload_file_does_not_exist(s3_gateway: S3Gateway, tmp_path):    path = tmp_path / "test-upload.txt"    object_name = "test-upload-file"    with pytest.raises(FileNotFoundError):        await s3_gateway.upload_file(object_name, path)async def test_download_file(s3_gateway: S3Gateway, object_in_s3, tmp_path):    path = tmp_path / "test-download.txt"    await s3_gateway.download_file(object_in_s3, path)    assert path.read_bytes() == b"foo"async def test_download_file_path_already_exists(    s3_gateway: S3Gateway, object_in_s3, tmp_path):    path = tmp_path / "test-download.txt"    path.write_bytes(b"bar")    with pytest.raises(FileExistsError):        await s3_gateway.download_file(object_in_s3, path)    assert path.read_bytes() == b"bar"async def test_download_file_does_not_exist(s3_gateway: S3Gateway, s3_bucket, tmp_path):    path = tmp_path / "test-download-does-not-exist.txt"    with pytest.raises(DoesNotExist):        await s3_gateway.download_file("some-nonexisting", path)    assert not path.exists()async def test_remove(s3_gateway: S3Gateway, s3_bucket, object_in_s3):    await s3_gateway.remove(object_in_s3)    assert await s3_gateway.get(object_in_s3) is Noneasync def test_remove_does_not_exist(s3_gateway: S3Gateway, s3_bucket):    await s3_gateway.remove("non-existing")@pytest.fixturedef multiple_objects(s3_bucket):    s3_bucket.upload_fileobj(io.BytesIO(b"a"), "raster-1/bla")    s3_bucket.upload_fileobj(io.BytesIO(b"ab"), "raster-2/bla")    s3_bucket.upload_fileobj(io.BytesIO(b"abc"), "raster-2/foo")    s3_bucket.upload_fileobj(io.BytesIO(b"abcde"), "raster-2/bz")    return ["raster-1/bla", "raster-2/bla", "raster-2/foo", "raster-2/bz"]async def test_remove_multiple(s3_gateway: S3Gateway, multiple_objects):    await s3_gateway.remove_multiple(multiple_objects[:2])    for key in multiple_objects[:2]:        assert await s3_gateway.get(key) is None    for key in multiple_objects[2:]:        assert await s3_gateway.get(key) is not Noneasync def test_remove_multiple_empty_list(s3_gateway: S3Gateway, s3_bucket):    await s3_gateway.remove_multiple([])async def test_remove_filtered_all(s3_gateway: S3Gateway, multiple_objects):    await s3_gateway.remove_filtered([])    for key in multiple_objects:        assert await s3_gateway.get(key) is Noneasync def test_remove_filtered_prefix(s3_gateway: S3Gateway, multiple_objects):    await s3_gateway.remove_filtered([Filter(field="prefix", values=["raster-2/"])])    assert await s3_gateway.get(multiple_objects[0]) is not None    for key in multiple_objects[1:]:        assert await s3_gateway.get(key) is None@mock.patch("clean_python.s3.s3_gateway.AWS_LIMIT", new=1)async def test_remove_filtered_pagination(s3_gateway: S3Gateway, multiple_objects):    await s3_gateway.remove_filtered([Filter(field="prefix", values=["raster-2/"])])    assert await s3_gateway.get(multiple_objects[0]) is not None    for key in multiple_objects[1:]:        assert await s3_gateway.get(key) is Noneasync def test_filter(s3_gateway: S3Gateway, multiple_objects):    actual = await s3_gateway.filter([], params=PageOptions(limit=10))    assert len(actual) == 4    assert actual[0]["id"] == "raster-1/bla"    assert isinstance(actual[0]["last_modified"], datetime)    assert actual[0]["etag"] == "0cc175b9c0f1b6a831c399e269772661"    assert actual[0]["size"] == 1async def test_filter_empty(s3_gateway: S3Gateway, s3_bucket):    actual = await s3_gateway.filter([], params=PageOptions(limit=10))    assert actual == []async def test_filter_with_limit(s3_gateway: S3Gateway, multiple_objects):    actual = await s3_gateway.filter([], params=PageOptions(limit=2))    assert len(actual) == 2    assert actual[0]["id"] == "raster-1/bla"    assert actual[1]["id"] == "raster-2/bla"async def test_filter_with_cursor(s3_gateway: S3Gateway, multiple_objects):    actual = await s3_gateway.filter(        [], params=PageOptions(limit=3, cursor="raster-2/bla")    )    assert len(actual) == 2    assert actual[0]["id"] == "raster-2/bz"    assert actual[1]["id"] == "raster-2/foo"async def test_filter_by_prefix(s3_gateway: S3Gateway, multiple_objects):    actual = await s3_gateway.filter([Filter(field="prefix", values=["raster-1/"])])    assert len(actual) == 1    actual = await s3_gateway.filter([Filter(field="prefix", values=["raster-2/"])])    assert len(actual) == 3async def test_get(s3_gateway: S3Gateway, object_in_s3):    actual = await s3_gateway.get(object_in_s3)    assert actual["id"] == "object-in-s3"    assert isinstance(actual["last_modified"], datetime)    assert actual["etag"] == "acbd18db4cc2f85cedef654fccc4a4d8"    assert actual["size"] == 3async def test_get_does_not_exist(s3_gateway: S3Gateway):    actual = await s3_gateway.get("non-existing")    assert actual is None
 |