test_s3_gateway.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. # -*- coding: utf-8 -*-
  2. # (c) Nelen & Schuurmans
  3. import io
  4. from datetime import datetime
  5. import boto3
  6. import pytest
  7. from botocore.exceptions import ClientError
  8. from clean_python import DoesNotExist
  9. from clean_python import Filter
  10. from clean_python import PageOptions
  11. from clean_python.s3 import S3BucketOptions
  12. from clean_python.s3 import S3BucketProvider
  13. from clean_python.s3 import S3Gateway
  14. @pytest.fixture(scope="session")
  15. def s3_settings(s3_url):
  16. minio_settings = {
  17. "url": s3_url,
  18. "access_key": "cleanpython",
  19. "secret_key": "cleanpython",
  20. "bucket": "cleanpython-test",
  21. "region": None,
  22. }
  23. if not minio_settings["bucket"].endswith("-test"): # type: ignore
  24. pytest.exit("Not running against a test minio bucket?! 😱")
  25. return minio_settings.copy()
  26. @pytest.fixture(scope="session")
  27. def s3_bucket(s3_settings):
  28. s3 = boto3.resource(
  29. "s3",
  30. endpoint_url=s3_settings["url"],
  31. aws_access_key_id=s3_settings["access_key"],
  32. aws_secret_access_key=s3_settings["secret_key"],
  33. )
  34. bucket = s3.Bucket(s3_settings["bucket"])
  35. # ensure existence
  36. try:
  37. bucket.create()
  38. except ClientError as e:
  39. if "BucketAlreadyOwnedByYou" in str(e):
  40. pass
  41. return bucket
  42. @pytest.fixture
  43. def s3_provider(s3_bucket, s3_settings):
  44. # wipe contents before each test
  45. s3_bucket.objects.all().delete()
  46. return S3BucketProvider(S3BucketOptions(**s3_settings))
  47. @pytest.fixture
  48. def s3_gateway(s3_provider):
  49. return S3Gateway(s3_provider)
  50. @pytest.fixture
  51. def object_in_s3(s3_bucket):
  52. s3_bucket.upload_fileobj(io.BytesIO(b"foo"), "object-in-s3")
  53. return "object-in-s3"
  54. @pytest.fixture
  55. def local_file(tmp_path):
  56. path = tmp_path / "test-upload.txt"
  57. path.write_bytes(b"foo")
  58. return path
  59. async def test_upload_file(s3_gateway: S3Gateway, local_file):
  60. object_name = "test-upload-file"
  61. await s3_gateway.upload_file(object_name, local_file)
  62. assert (await s3_gateway.get(object_name))["size"] == 3
  63. async def test_upload_file_does_not_exist(s3_gateway: S3Gateway, tmp_path):
  64. path = tmp_path / "test-upload.txt"
  65. object_name = "test-upload-file"
  66. with pytest.raises(FileNotFoundError):
  67. await s3_gateway.upload_file(object_name, path)
  68. async def test_download_file(s3_gateway: S3Gateway, object_in_s3, tmp_path):
  69. path = tmp_path / "test-download.txt"
  70. await s3_gateway.download_file(object_in_s3, path)
  71. assert path.read_bytes() == b"foo"
  72. async def test_download_file_path_already_exists(
  73. s3_gateway: S3Gateway, object_in_s3, tmp_path
  74. ):
  75. path = tmp_path / "test-download.txt"
  76. path.write_bytes(b"bar")
  77. with pytest.raises(FileExistsError):
  78. await s3_gateway.download_file(object_in_s3, path)
  79. assert path.read_bytes() == b"bar"
  80. async def test_download_file_does_not_exist(s3_gateway: S3Gateway, s3_bucket, tmp_path):
  81. path = tmp_path / "test-download-does-not-exist.txt"
  82. with pytest.raises(DoesNotExist):
  83. await s3_gateway.download_file("some-nonexisting", path)
  84. assert not path.exists()
  85. async def test_remove(s3_gateway: S3Gateway, s3_bucket, object_in_s3):
  86. await s3_gateway.remove(object_in_s3)
  87. assert await s3_gateway.get(object_in_s3) is None
  88. async def test_remove_does_not_exist(s3_gateway: S3Gateway, s3_bucket):
  89. await s3_gateway.remove("non-existing")
  90. @pytest.fixture
  91. def multiple_objects(s3_bucket):
  92. s3_bucket.upload_fileobj(io.BytesIO(b"a"), "raster-1/bla")
  93. s3_bucket.upload_fileobj(io.BytesIO(b"ab"), "raster-2/bla")
  94. s3_bucket.upload_fileobj(io.BytesIO(b"abc"), "raster-2/foo")
  95. s3_bucket.upload_fileobj(io.BytesIO(b"abcde"), "raster-2/bz")
  96. return ["raster-1/bla", "raster-2/bla", "raster-2/foo", "raster-2/bz"]
  97. async def test_remove_multiple(s3_gateway: S3Gateway, multiple_objects):
  98. await s3_gateway.remove_multiple(multiple_objects[:2])
  99. for key in multiple_objects[:2]:
  100. assert await s3_gateway.get(key) is None
  101. for key in multiple_objects[2:]:
  102. assert await s3_gateway.get(key) is not None
  103. async def test_remove_multiple_empty_list(s3_gateway: S3Gateway, s3_bucket):
  104. await s3_gateway.remove_multiple([])
  105. async def test_filter(s3_gateway: S3Gateway, multiple_objects):
  106. actual = await s3_gateway.filter([], params=PageOptions(limit=10))
  107. assert len(actual) == 4
  108. assert actual[0]["id"] == "raster-1/bla"
  109. assert isinstance(actual[0]["last_modified"], datetime)
  110. assert actual[0]["etag"] == "0cc175b9c0f1b6a831c399e269772661"
  111. assert actual[0]["size"] == 1
  112. async def test_filter_empty(s3_gateway: S3Gateway, s3_bucket):
  113. actual = await s3_gateway.filter([], params=PageOptions(limit=10))
  114. assert actual == []
  115. async def test_filter_with_prefix(s3_gateway: S3Gateway, multiple_objects):
  116. actual = await s3_gateway.filter(
  117. [Filter(field="prefix", values=["raster-2/"])], params=PageOptions(limit=10)
  118. )
  119. assert len(actual) == 3
  120. async def test_filter_with_limit(s3_gateway: S3Gateway, multiple_objects):
  121. actual = await s3_gateway.filter([], params=PageOptions(limit=2))
  122. assert len(actual) == 2
  123. assert actual[0]["id"] == "raster-1/bla"
  124. assert actual[1]["id"] == "raster-2/bla"
  125. async def test_filter_with_cursor(s3_gateway: S3Gateway, multiple_objects):
  126. actual = await s3_gateway.filter(
  127. [], params=PageOptions(limit=3, cursor="raster-2/bla")
  128. )
  129. assert len(actual) == 2
  130. assert actual[0]["id"] == "raster-2/bz"
  131. assert actual[1]["id"] == "raster-2/foo"
  132. async def test_get(s3_gateway: S3Gateway, object_in_s3):
  133. actual = await s3_gateway.get(object_in_s3)
  134. assert actual["id"] == "object-in-s3"
  135. assert isinstance(actual["last_modified"], datetime)
  136. assert actual["etag"] == "acbd18db4cc2f85cedef654fccc4a4d8"
  137. assert actual["size"] == 3
  138. async def test_get_does_not_exist(s3_gateway: S3Gateway):
  139. actual = await s3_gateway.get("non-existing")
  140. assert actual is None