ソースを参照

Add api_client subpackage

Casper van der Wel 1 年間 前
コミット
595da26504

+ 2 - 0
CHANGES.md

@@ -6,6 +6,8 @@
 
 - Added `SyncGateway`, `SyncRepository`, and `InMemorySyncGateway`.
 
+- Added optional `api_client` subpackage (based on `urllib3`).
+
 
 0.5.1 (2023-09-25)
 ------------------

+ 4 - 0
clean_python/api_client/__init__.py

@@ -0,0 +1,4 @@
+from .api_gateway import *  # NOQA
+from .api_provider import *  # NOQA
+from .exceptions import *  # NOQA
+from .files import *  # NOQA

+ 37 - 0
clean_python/api_client/api_gateway.py

@@ -0,0 +1,37 @@
+import inject
+
+from clean_python import Id
+from clean_python import Json
+
+from .. import SyncGateway
+from .api_provider import SyncApiProvider
+
+__all__ = ["SyncApiGateway"]
+
+
+class SyncApiGateway(SyncGateway):
+    path: str
+
+    def __init__(self, provider_override: SyncApiProvider | None = None):
+        self.provider_override = provider_override
+
+    def __init_subclass__(cls, path: str) -> None:
+        assert not path.startswith("/")
+        assert "{id}" in path
+        cls.path = path
+        super().__init_subclass__()
+
+    @property
+    def provider(self) -> SyncApiProvider:
+        return self.provider_override or inject.instance(SyncApiProvider)
+
+    def get(self, id: Id) -> Json | None:
+        return self.provider.request("GET", self.path.format(id=id))
+
+    def add(self, item: Json) -> Json:
+        result = self.provider.request("POST", self.path.format(id=""), json=item)
+        assert result is not None
+        return result
+
+    def remove(self, id: Id) -> bool:
+        return self.provider.request("DELETE", self.path.format(id=id)) is not None

+ 100 - 0
clean_python/api_client/api_provider.py

@@ -0,0 +1,100 @@
+from http import HTTPStatus
+from typing import Callable
+from urllib.parse import urlencode
+from urllib.parse import urljoin
+
+from pydantic import AnyHttpUrl
+from urllib3 import PoolManager
+from urllib3 import Retry
+
+from clean_python import ctx
+from clean_python import Json
+
+from .exceptions import ApiException
+
+__all__ = ["SyncApiProvider"]
+
+
+def is_success(status: HTTPStatus) -> bool:
+    """Returns True on 2xx status"""
+    return (int(status) // 100) == 2
+
+
+def join(url: str, path: str) -> str:
+    """Results in a full url without trailing slash"""
+    assert url.endswith("/")
+    assert not path.startswith("/")
+    result = urljoin(url, path)
+    if result.endswith("/"):
+        result = result[:-1]
+    return result
+
+
+def add_query_params(url: str, params: Json | None) -> str:
+    if params is None:
+        return url
+    return url + "?" + urlencode(params, doseq=True)
+
+
+class SyncApiProvider:
+    """Basic JSON API provider with retry policy and bearer tokens.
+
+    The default retry policy has 3 retries with 1, 2, 4 second intervals.
+
+    Args:
+        url: The url of the API (with trailing slash)
+        fetch_token: Callable that returns a token for a tenant id
+        retries: Total number of retries per request
+        backoff_factor: Multiplier for retry delay times (1, 2, 4, ...)
+    """
+
+    def __init__(
+        self,
+        url: AnyHttpUrl,
+        fetch_token: Callable[[PoolManager, int], str | None],
+        retries: int = 3,
+        backoff_factor: float = 1.0,
+    ):
+        self._url = str(url)
+        assert self._url.endswith("/")
+        self._fetch_token = fetch_token
+        self._pool = PoolManager(retries=Retry(retries, backoff_factor=backoff_factor))
+
+    def request(
+        self,
+        method: str,
+        path: str,
+        params: Json | None = None,
+        json: Json | None = None,
+        fields: Json | None = None,
+        timeout: float = 5.0,
+    ) -> Json | None:
+        assert ctx.tenant is not None
+        url = join(self._url, path)
+        token = self._fetch_token(self._pool, ctx.tenant.id)
+        headers = {}
+        if token is not None:
+            headers["Authorization"] = f"Bearer {token}"
+        response = self._pool.request(
+            method=method,
+            url=add_query_params(url, params),
+            json=json,
+            fields=fields,
+            headers=headers,
+            timeout=timeout,
+        )
+        status = HTTPStatus(response.status)
+        content_type = response.headers.get("Content-Type")
+        if content_type is None and status is HTTPStatus.NO_CONTENT:
+            return {"status": int(status)}  # we have to return something...
+        if content_type != "application/json":
+            raise ApiException(
+                f"Unexpected content type '{content_type}'", status=status
+            )
+        body = response.json()
+        if status is HTTPStatus.NOT_FOUND:
+            return None
+        elif is_success(status):
+            return body
+        else:
+            raise ApiException(body, status=status)

+ 10 - 0
clean_python/api_client/exceptions.py

@@ -0,0 +1,10 @@
+from http import HTTPStatus
+from typing import Any
+
+__all__ = ["ApiException"]
+
+
+class ApiException(ValueError):
+    def __init__(self, obj: Any, status: HTTPStatus):
+        self.status = status
+        super().__init__(obj)

+ 397 - 0
clean_python/api_client/files.py

@@ -0,0 +1,397 @@
+import base64
+import hashlib
+import logging
+import os
+import re
+from pathlib import Path
+from typing import BinaryIO
+from typing import Callable
+from typing import Optional
+from typing import Tuple
+from typing import Union
+from urllib.parse import urlparse
+
+import urllib3
+
+from .exceptions import ApiException
+
+__all__ = ["download_file", "download_fileobj", "upload_file", "upload_fileobj"]
+
+
+CONTENT_RANGE_REGEXP = re.compile(r"^bytes (\d+)-(\d+)/(\d+|\*)$")
+# Default upload timeout has an increased socket read timeout, because MinIO
+# takes very long for completing the upload for larger files. The limit of 10 minutes
+# should accomodate files up to 150 GB.
+DEFAULT_UPLOAD_TIMEOUT = urllib3.Timeout(connect=5.0, read=600.0)
+
+
+logger = logging.getLogger(__name__)
+
+
+def get_pool(retries: int = 3, backoff_factor: float = 1.0) -> urllib3.PoolManager:
+    """Create a PoolManager with a retry policy.
+
+    The default retry policy has 3 retries with 1, 2, 4 second intervals.
+
+    Args:
+        retries: Total number of retries per request
+        backoff_factor: Multiplier for retry delay times (1, 2, 4, ...)
+    """
+    return urllib3.PoolManager(
+        retries=urllib3.util.retry.Retry(retries, backoff_factor=backoff_factor)
+    )
+
+
+def compute_md5(fileobj: BinaryIO, chunk_size: int = 16777216):
+    """Compute the MD5 checksum of a file object."""
+    fileobj.seek(0)
+    hasher = hashlib.md5()
+    for chunk in _iter_chunks(fileobj, chunk_size=chunk_size):
+        hasher.update(chunk)
+    return hasher.digest()
+
+
+def download_file(
+    url: str,
+    target: Path,
+    chunk_size: int = 16777216,
+    timeout: Optional[Union[float, urllib3.Timeout]] = 5.0,
+    pool: Optional[urllib3.PoolManager] = None,
+    callback_func: Optional[Callable[[int, int], None]] = None,
+) -> Tuple[Path, int]:
+    """Download a file to a specified path on disk.
+
+    It is assumed that the file server supports multipart downloads (range
+    requests).
+
+    Args:
+        url: The url to retrieve.
+        target: The location to copy to. If this is an existing file, it is
+            overwritten. If it is a directory, a filename is generated from
+            the filename in the url.
+        chunk_size: The number of bytes per request. Default: 16MB.
+        timeout: The total timeout in seconds.
+        pool: If not supplied, a default connection pool will be
+            created with a retry policy of 3 retries after 1, 2, 4 seconds.
+        callback_func: optional function used to receive: bytes_downloaded, total_bytes
+            for example: def callback(bytes_downloaded: int, total_bytes: int) -> None
+
+    Returns:
+        Tuple of file path, total number of downloaded bytes.
+
+    Raises:
+        ApiException: raised on unexpected server
+            responses (HTTP status codes other than 206, 413, 429, 503)
+        urllib3.exceptions.HTTPError: various low-level HTTP errors that persist
+            after retrying: connection errors, timeouts, decode errors,
+            invalid HTTP headers, payload too large (HTTP 413), too many
+            requests (HTTP 429), service unavailable (HTTP 503)
+    """
+    # cast string to Path if necessary
+    if isinstance(target, str):
+        target = Path(target)
+
+    # if it is a directory, take the filename from the url
+    if target.is_dir():
+        target = target / urlparse(url)[2].rsplit("/", 1)[-1]
+
+    # open the file
+    try:
+        with target.open("wb") as fileobj:
+            size = download_fileobj(
+                url,
+                fileobj,
+                chunk_size=chunk_size,
+                timeout=timeout,
+                pool=pool,
+                callback_func=callback_func,
+            )
+    except Exception:
+        # Clean up a partially downloaded file
+        try:
+            os.remove(target)
+        except FileNotFoundError:
+            pass
+        raise
+
+    return target, size
+
+
+def download_fileobj(
+    url: str,
+    fileobj: BinaryIO,
+    chunk_size: int = 16777216,
+    timeout: Optional[Union[float, urllib3.Timeout]] = 5.0,
+    pool: Optional[urllib3.PoolManager] = None,
+    callback_func: Optional[Callable[[int, int], None]] = None,
+) -> int:
+    """Download a url to a file object using multiple requests.
+
+    It is assumed that the file server supports multipart downloads (range
+    requests).
+
+    Args:
+        url: The url to retrieve.
+        fileobj: The (binary) file object to write into.
+        chunk_size: The number of bytes per request. Default: 16MB.
+        timeout: The total timeout in seconds.
+        pool: If not supplied, a default connection pool will be
+            created with a retry policy of 3 retries after 1, 2, 4 seconds.
+        callback_func: optional function used to receive: bytes_downloaded, total_bytes
+            for example: def callback(bytes_downloaded: int, total_bytes: int) -> None
+
+    Returns:
+        The total number of downloaded bytes.
+
+    Raises:
+        ApiException: raised on unexpected server
+            responses (HTTP status codes other than 206, 413, 429, 503)
+        urllib3.exceptions.HTTPError: various low-level HTTP errors that persist
+            after retrying: connection errors, timeouts, decode errors,
+            invalid HTTP headers, payload too large (HTTP 413), too many
+            requests (HTTP 429), service unavailable (HTTP 503)
+
+        Note that the fileobj might be partially filled with data in case of
+        an exception.
+    """
+    if pool is None:
+        pool = get_pool()
+
+    # Our strategy here is to just start downloading chunks while monitoring
+    # the Content-Range header to check if we're done. Although we could get
+    # the total Content-Length from a HEAD request, not all servers support
+    # that (e.g. Minio).
+    start = 0
+    while True:
+        # download a chunk
+        stop = start + chunk_size - 1
+        headers = {"Range": "bytes={}-{}".format(start, stop)}
+
+        response = pool.request(
+            "GET",
+            url,
+            headers=headers,
+            timeout=timeout,
+        )
+        if response.status == 200:
+            raise ApiException(
+                "The file server does not support multipart downloads.",
+                status=response.status,
+            )
+        elif response.status != 206:
+            raise ApiException("Unexpected status", status=response.status)
+
+        # write to file
+        fileobj.write(response.data)
+
+        # parse content-range header (e.g. "bytes 0-3/7") for next iteration
+        content_range = response.headers["Content-Range"]
+
+        start, stop, total = [
+            int(x) for x in CONTENT_RANGE_REGEXP.findall(content_range)[0]
+        ]
+
+        if callable(callback_func):
+            download_bytes: int = total if stop + 1 >= total else stop
+            callback_func(download_bytes, total)
+
+        if stop + 1 >= total:
+            break
+        start += chunk_size
+
+    return total
+
+
+def upload_file(
+    url: str,
+    file_path: Path,
+    chunk_size: int = 16777216,
+    timeout: Optional[Union[float, urllib3.Timeout]] = None,
+    pool: Optional[urllib3.PoolManager] = None,
+    md5: Optional[bytes] = None,
+    callback_func: Optional[Callable[[int, int], None]] = None,
+) -> int:
+    """Upload a file at specified file path to a url.
+
+    The upload is accompanied by an MD5 hash so that the file server checks
+    the integrity of the file.
+
+    Args:
+        url: The url to upload to.
+        file_path: The file path to read data from.
+        chunk_size: The size of the chunk in the streaming upload. Note that this
+            function does not do multipart upload. Default: 16MB.
+        timeout: The total timeout in seconds. The default is a connect timeout of
+            5 seconds and a read timeout of 10 minutes.
+        pool: If not supplied, a default connection pool will be
+            created with a retry policy of 3 retries after 1, 2, 4 seconds.
+        md5: The MD5 digest (binary) of the file. Supply the MD5 to enable server-side
+            integrity check. Note that when using presigned urls in AWS S3, the md5 hash
+            should be included in the signing procedure.
+        callback_func: optional function used to receive: bytes_uploaded, total_bytes
+            for example: def callback(bytes_uploaded: int, total_bytes: int) -> None
+
+    Returns:
+        The total number of uploaded bytes.
+
+    Raises:
+        IOError: Raised if the provided file is incompatible or empty.
+        ApiException: raised on unexpected server
+            responses (HTTP status codes other than 206, 413, 429, 503)
+        urllib3.exceptions.HTTPError: various low-level HTTP errors that persist
+            after retrying: connection errors, timeouts, decode errors,
+            invalid HTTP headers, payload too large (HTTP 413), too many
+            requests (HTTP 429), service unavailable (HTTP 503)
+    """
+    # cast string to Path if necessary
+    if isinstance(file_path, str):
+        file_path = Path(file_path)
+
+    # open the file
+    with file_path.open("rb") as fileobj:
+        size = upload_fileobj(
+            url,
+            fileobj,
+            chunk_size=chunk_size,
+            timeout=timeout,
+            pool=pool,
+            md5=md5,
+            callback_func=callback_func,
+        )
+
+    return size
+
+
+def _iter_chunks(
+    fileobj: BinaryIO,
+    chunk_size: int,
+    callback_func: Optional[Callable[[int], None]] = None,
+):
+    """Yield chunks from a file stream"""
+    uploaded_bytes: int = 0
+    assert chunk_size > 0
+    while True:
+        data = fileobj.read(chunk_size)
+        if len(data) == 0:
+            break
+        uploaded_bytes += chunk_size
+        if callable(callback_func):
+            callback_func(uploaded_bytes)
+        yield data
+
+
+class _SeekableChunkIterator:
+    """A chunk iterator that can be rewinded in case of urllib3 retries."""
+
+    def __init__(
+        self,
+        fileobj: BinaryIO,
+        chunk_size: int,
+        callback_func: Optional[Callable[[int], None]] = None,
+    ):
+        self.fileobj = fileobj
+        self.chunk_size = chunk_size
+        self.callback_func = callback_func
+
+    def seek(self, pos: int):
+        return self.fileobj.seek(pos)
+
+    def tell(self):
+        return self.fileobj.tell()
+
+    def __iter__(self):
+        return _iter_chunks(self.fileobj, self.chunk_size, self.callback_func)
+
+
+def upload_fileobj(
+    url: str,
+    fileobj: BinaryIO,
+    chunk_size: int = 16777216,
+    timeout: Optional[Union[float, urllib3.Timeout]] = None,
+    pool: Optional[urllib3.PoolManager] = None,
+    md5: Optional[bytes] = None,
+    callback_func: Optional[Callable[[int, int], None]] = None,
+) -> int:
+    """Upload a file object to a url.
+
+    The upload is accompanied by an MD5 hash so that the file server checks
+    the integrity of the file.
+
+    Args:
+        url: The url to upload to.
+        fileobj: The (binary) file object to read from.
+        chunk_size: The size of the chunk in the streaming upload. Note that this
+            function does not do multipart upload. Default: 16MB.
+        timeout: The total timeout in seconds. The default is a connect timeout of
+            5 seconds and a read timeout of 10 minutes.
+        pool: If not supplied, a default connection pool will be
+            created with a retry policy of 3 retries after 1, 2, 4 seconds.
+        md5: The MD5 digest (binary) of the file. Supply the MD5 to enable server-side
+            integrity check. Note that when using presigned urls in AWS S3, the md5 hash
+            should be included in the signing procedure.
+        callback_func: optional function used to receive: bytes_uploaded, total_bytes
+            for example: def callback(bytes_uploaded: int, total_bytes: int) -> None
+
+    Returns:
+        The total number of uploaded bytes.
+
+    Raises:
+        IOError: Raised if the provided file is incompatible or empty.
+        ApiException: raised on unexpected server
+            responses (HTTP status codes other than 206, 413, 429, 503)
+        urllib3.exceptions.HTTPError: various low-level HTTP errors that persist
+            after retrying: connection errors, timeouts, decode errors,
+            invalid HTTP headers, payload too large (HTTP 413), too many
+            requests (HTTP 429), service unavailable (HTTP 503)
+    """
+    # There are two ways to upload in S3 (Minio):
+    # - PutObject: put the whole object in one time
+    # - multipart upload: requires presigned urls for every part
+    # We can only do the first option as we have no other presigned urls.
+    # So we take the first option, but we do stream the request body in chunks.
+
+    # We will get hard to understand tracebacks if the fileobj is not
+    # in binary mode. So use a trick to see if fileobj is in binary mode:
+    if not isinstance(fileobj.read(0), bytes):
+        raise IOError(
+            "The file object is not in binary mode. Please open with mode='rb'."
+        )
+
+    file_size = fileobj.seek(0, 2)  # go to EOF
+    if file_size == 0:
+        raise IOError("The file object is empty.")
+
+    if pool is None:
+        pool = get_pool()
+
+    fileobj.seek(0)
+
+    def callback(uploaded_bytes: int):
+        if callable(callback_func):
+            if uploaded_bytes > file_size:
+                uploaded_bytes = file_size
+            callback_func(uploaded_bytes, file_size)
+
+    iterable = _SeekableChunkIterator(
+        fileobj,
+        chunk_size=chunk_size,
+        callback_func=callback,
+    )
+
+    # Tested: both Content-Length and Content-MD5 are checked by Minio
+    headers = {
+        "Content-Length": str(file_size),
+    }
+    if md5 is not None:
+        headers["Content-MD5"] = base64.b64encode(md5).decode()
+    response = pool.request(
+        "PUT",
+        url,
+        body=iterable,
+        headers=headers,
+        timeout=DEFAULT_UPLOAD_TIMEOUT if timeout is None else timeout,
+    )
+    if response.status != 200:
+        raise ApiException("Unexpected status", status=response.status)
+
+    return file_size

+ 1 - 0
pyproject.toml

@@ -28,6 +28,7 @@ celery = ["pika"]
 fluentbit = ["fluent-logger"]
 sql = ["sqlalchemy==2.*", "asyncpg"]
 s3 = ["aioboto3", "boto3"]
+api_client = ["urllib3"]
 
 [project.urls]
 homepage = "https://github.com/nens/clean-python"

+ 51 - 0
tests/api_client/test_sync_api_gateway.py

@@ -0,0 +1,51 @@
+from unittest import mock
+
+import pytest
+
+from clean_python.api_client import SyncApiGateway
+from clean_python.api_client import SyncApiProvider
+
+MODULE = "clean_python.api_client.api_provider"
+
+
+class TstSyncApiGateway(SyncApiGateway, path="foo/{id}"):
+    pass
+
+
+@pytest.fixture
+def api_provider():
+    return mock.MagicMock(spec_set=SyncApiProvider)
+
+
+@pytest.fixture
+def api_gateway(api_provider) -> SyncApiGateway:
+    return TstSyncApiGateway(api_provider)
+
+
+def test_get(api_gateway: SyncApiGateway):
+    actual = api_gateway.get(14)
+
+    api_gateway.provider.request.assert_called_once_with("GET", "foo/14")
+    assert actual is api_gateway.provider.request.return_value
+
+
+def test_add(api_gateway: SyncApiGateway):
+    actual = api_gateway.add({"foo": 2})
+
+    api_gateway.provider.request.assert_called_once_with(
+        "POST", "foo/", json={"foo": 2}
+    )
+    assert actual is api_gateway.provider.request.return_value
+
+
+def test_remove(api_gateway: SyncApiGateway):
+    actual = api_gateway.remove(2)
+
+    api_gateway.provider.request.assert_called_once_with("DELETE", "foo/2")
+    assert actual is True
+
+
+def test_remove_does_not_exist(api_gateway: SyncApiGateway):
+    api_gateway.provider.request.return_value = None
+    actual = api_gateway.remove(2)
+    assert actual is False

+ 140 - 0
tests/api_client/test_sync_api_provider.py

@@ -0,0 +1,140 @@
+from http import HTTPStatus
+from unittest import mock
+
+import pytest
+
+from clean_python import ctx
+from clean_python import Tenant
+from clean_python.api_client import ApiException
+from clean_python.api_client import SyncApiProvider
+
+MODULE = "clean_python.api_client.api_provider"
+
+
+@pytest.fixture
+def tenant() -> Tenant:
+    ctx.tenant = Tenant(id=2, name="")
+    return ctx.tenant
+
+
+@pytest.fixture
+def response():
+    response = mock.Mock()
+    response.status = int(HTTPStatus.OK)
+    response.headers = {"Content-Type": "application/json"}
+    return response
+
+
+@pytest.fixture
+def api_provider(tenant, response) -> SyncApiProvider:
+    with mock.patch(MODULE + ".PoolManager"):
+        api_provider = SyncApiProvider(
+            url="http://testserver/foo/",
+            fetch_token=lambda a, b: f"tenant-{b}",
+        )
+        api_provider._pool.request.return_value = response
+        yield api_provider
+
+
+def test_get(api_provider: SyncApiProvider, response):
+    actual = api_provider.request("GET", "")
+
+    assert api_provider._pool.request.call_count == 1
+    assert api_provider._pool.request.call_args[1] == dict(
+        method="GET",
+        url="http://testserver/foo",
+        json=None,
+        fields=None,
+        headers={"Authorization": "Bearer tenant-2"},
+        timeout=5.0,
+    )
+    assert actual == response.json.return_value
+
+
+def test_post_json(api_provider: SyncApiProvider, response):
+    response.status == int(HTTPStatus.CREATED)
+    api_provider._pool.request.return_value = response
+    actual = api_provider.request("POST", "bar", json={"foo": 2})
+
+    assert api_provider._pool.request.call_count == 1
+
+    assert api_provider._pool.request.call_args[1] == dict(
+        method="POST",
+        url="http://testserver/foo/bar",
+        json={"foo": 2},
+        fields=None,
+        headers={"Authorization": "Bearer tenant-2"},
+        timeout=5.0,
+    )
+    assert actual == response.json.return_value
+
+
+@pytest.mark.parametrize(
+    "path,params,expected_url",
+    [
+        ("", None, "http://testserver/foo"),
+        ("bar", None, "http://testserver/foo/bar"),
+        ("bar/", None, "http://testserver/foo/bar"),
+        ("", {"a": 2}, "http://testserver/foo?a=2"),
+        ("bar", {"a": 2}, "http://testserver/foo/bar?a=2"),
+        ("bar/", {"a": 2}, "http://testserver/foo/bar?a=2"),
+        ("", {"a": [1, 2]}, "http://testserver/foo?a=1&a=2"),
+        ("", {"a": 1, "b": "foo"}, "http://testserver/foo?a=1&b=foo"),
+    ],
+)
+def test_url(api_provider: SyncApiProvider, path, params, expected_url):
+    api_provider.request("GET", path, params=params)
+    assert api_provider._pool.request.call_args[1]["url"] == expected_url
+
+
+def test_timeout(api_provider: SyncApiProvider):
+    api_provider.request("POST", "bar", timeout=2.1)
+    assert api_provider._pool.request.call_args[1]["timeout"] == 2.1
+
+
+@pytest.mark.parametrize(
+    "status", [HTTPStatus.OK, HTTPStatus.NOT_FOUND, HTTPStatus.INTERNAL_SERVER_ERROR]
+)
+def test_unexpected_content_type(api_provider: SyncApiProvider, response, status):
+    response.status = int(status)
+    response.headers["Content-Type"] = "text/plain"
+    with pytest.raises(ApiException) as e:
+        api_provider.request("GET", "bar")
+
+    assert e.value.status is status
+    assert str(e.value) == "Unexpected content type 'text/plain'"
+
+
+def test_no_content(api_provider: SyncApiProvider, response):
+    response.status = int(HTTPStatus.NO_CONTENT)
+    response.headers = {}
+
+    actual = api_provider.request("DELETE", "bar/2")
+    assert actual is not None
+
+
+def test_404(api_provider: SyncApiProvider, response):
+    response.status = int(HTTPStatus.NOT_FOUND)
+    actual = api_provider.request("GET", "bar")
+    assert actual is None
+
+
+@pytest.mark.parametrize("status", [HTTPStatus.BAD_REQUEST, HTTPStatus.IM_A_TEAPOT])
+def test_error_response(api_provider: SyncApiProvider, response, status):
+    response.status = int(status)
+
+    with pytest.raises(ApiException) as e:
+        api_provider.request("GET", "bar")
+
+    assert e.value.status is status
+    assert str(e.value) == str(response.json())
+
+
+@mock.patch(MODULE + ".PoolManager", new=mock.Mock())
+def test_no_token(response):
+    api_provider = SyncApiProvider(
+        url="http://testserver/foo/", fetch_token=lambda a, b: None
+    )
+    api_provider._pool.request.return_value = response
+    api_provider.request("GET", "")
+    assert api_provider._pool.request.call_args[1]["headers"] == {}

+ 257 - 0
tests/api_client/test_sync_files.py

@@ -0,0 +1,257 @@
+import io
+from unittest import mock
+
+import pytest
+from urllib3.response import HTTPResponse
+from urllib3.util.request import set_file_position
+
+from clean_python.api_client import ApiException
+from clean_python.api_client import download_file
+from clean_python.api_client import download_fileobj
+from clean_python.api_client import upload_file
+from clean_python.api_client import upload_fileobj
+from clean_python.api_client.files import _SeekableChunkIterator
+from clean_python.api_client.files import DEFAULT_UPLOAD_TIMEOUT
+
+MODULE = "clean_python.api_client.files"
+
+
+@pytest.fixture
+def pool():
+    pool = mock.Mock()
+    return pool
+
+
+@pytest.fixture
+def responses_single():
+    return [
+        HTTPResponse(
+            body=b"X" * 42,
+            headers={"Content-Range": "bytes 0-41/42"},
+            status=206,
+        )
+    ]
+
+
+@pytest.fixture
+def responses_double():
+    return [
+        HTTPResponse(
+            body=b"X" * 64,
+            headers={"Content-Range": "bytes 0-63/65"},
+            status=206,
+        ),
+        HTTPResponse(
+            body=b"X",
+            headers={"Content-Range": "bytes 64-64/65"},
+            status=206,
+        ),
+    ]
+
+
+def test_download_fileobj(pool, responses_single):
+    stream = io.BytesIO()
+    pool.request.side_effect = responses_single
+    download_fileobj("some-url", stream, chunk_size=64, pool=pool)
+
+    pool.request.assert_called_with(
+        "GET",
+        "some-url",
+        headers={"Range": "bytes=0-63"},
+        timeout=5.0,
+    )
+    assert stream.tell() == 42
+
+
+def test_download_fileobj_two_chunks(pool, responses_double):
+    stream = io.BytesIO()
+    pool.request.side_effect = responses_double
+
+    callback_func = mock.Mock()
+
+    download_fileobj(
+        "some-url", stream, chunk_size=64, pool=pool, callback_func=callback_func
+    )
+
+    (_, kwargs1), (_, kwargs2) = pool.request.call_args_list
+    assert kwargs1["headers"] == {"Range": "bytes=0-63"}
+    assert kwargs2["headers"] == {"Range": "bytes=64-127"}
+    assert stream.tell() == 65
+
+    # Check callback func
+    (args1, _), (args2, _) = callback_func.call_args_list
+
+    assert args1 == (63, 65)
+    assert args2 == (65, 65)
+
+
+def test_download_fileobj_no_multipart(pool, responses_single):
+    """The remote server does not support range requests"""
+    responses_single[0].status = 200
+    pool.request.side_effect = responses_single
+
+    with pytest.raises(ApiException) as e:
+        download_fileobj("some-url", None, chunk_size=64, pool=pool)
+
+    assert e.value.status == 200
+    assert str(e.value) == "The file server does not support multipart downloads."
+
+
+def test_download_fileobj_forbidden(pool, responses_single):
+    """The remote server does not support range requests"""
+    responses_single[0].status = 403
+    pool.request.side_effect = responses_single
+
+    with pytest.raises(ApiException) as e:
+        download_fileobj("some-url", None, chunk_size=64, pool=pool)
+
+    assert e.value.status == 403
+
+
+@mock.patch(MODULE + ".download_fileobj")
+def test_download_file(download_fileobj, tmp_path):
+    download_file(
+        "http://domain/a.b", tmp_path / "c.d", chunk_size=64, timeout=3.0, pool="foo"
+    )
+
+    args, kwargs = download_fileobj.call_args
+    assert args[0] == "http://domain/a.b"
+    assert isinstance(args[1], io.IOBase)
+    assert args[1].mode == "wb"
+    assert args[1].name == str(tmp_path / "c.d")
+    assert kwargs["chunk_size"] == 64
+    assert kwargs["timeout"] == 3.0
+    assert kwargs["pool"] == "foo"
+
+
+@mock.patch(MODULE + ".download_fileobj")
+def test_download_file_directory(download_fileobj, tmp_path):
+    # if a target directory is specified, a filename is created from the url
+    download_file("http://domain/a.b", tmp_path, chunk_size=64, timeout=3.0, pool="foo")
+
+    args, kwargs = download_fileobj.call_args
+    assert args[1].name == str(tmp_path / "a.b")
+
+
+@pytest.fixture
+def upload_response():
+    return HTTPResponse(status=200)
+
+
+@pytest.fixture
+def fileobj():
+    stream = io.BytesIO()
+    stream.write(b"X" * 39)
+    stream.seek(0)
+    return stream
+
+
+@pytest.mark.parametrize(
+    "chunk_size,expected_body",
+    [
+        (64, [b"X" * 39]),
+        (39, [b"X" * 39]),
+        (38, [b"X" * 38, b"X"]),
+        (16, [b"X" * 16, b"X" * 16, b"X" * 7]),
+    ],
+)
+def test_upload_fileobj(pool, fileobj, upload_response, chunk_size, expected_body):
+    pool.request.return_value = upload_response
+    upload_fileobj("some-url", fileobj, chunk_size=chunk_size, pool=pool)
+
+    args, kwargs = pool.request.call_args
+    assert args == ("PUT", "some-url")
+    assert list(kwargs["body"]) == expected_body
+    assert kwargs["headers"] == {"Content-Length": "39"}
+    assert kwargs["timeout"] == DEFAULT_UPLOAD_TIMEOUT
+
+
+def test_upload_fileobj_callback(pool, fileobj, upload_response):
+    expected_body = [b"X" * 16, b"X" * 16, b"X" * 7]
+    chunk_size = 16
+
+    pool.request.return_value = upload_response
+    callback_func = mock.Mock()
+
+    upload_fileobj(
+        "some-url",
+        fileobj,
+        chunk_size=chunk_size,
+        pool=pool,
+        callback_func=callback_func,
+    )
+
+    args, kwargs = pool.request.call_args
+    assert args == ("PUT", "some-url")
+    assert list(kwargs["body"]) == expected_body
+    assert kwargs["headers"] == {"Content-Length": "39"}
+    assert kwargs["timeout"] == DEFAULT_UPLOAD_TIMEOUT
+
+    # Check callback_func
+    (args1, _), (args2, _), (args3, _) = callback_func.call_args_list
+    assert args1 == (16, 39)
+    assert args2 == (32, 39)
+    assert args3 == (39, 39)
+
+
+def test_upload_fileobj_with_md5(pool, fileobj, upload_response):
+    pool.request.return_value = upload_response
+    upload_fileobj("some-url", fileobj, pool=pool, md5=b"abcd")
+
+    # base64.b64encode(b"abcd")).decode()
+    expected_md5 = "YWJjZA=="
+
+    args, kwargs = pool.request.call_args
+    assert kwargs["headers"] == {"Content-Length": "39", "Content-MD5": expected_md5}
+
+
+def test_upload_fileobj_empty_file():
+    with pytest.raises(IOError, match="The file object is empty."):
+        upload_fileobj("some-url", io.BytesIO())
+
+
+def test_upload_fileobj_non_binary_file():
+    with pytest.raises(IOError, match="The file object is not in binary*"):
+        upload_fileobj("some-url", io.StringIO())
+
+
+def test_upload_fileobj_errors(pool, fileobj, upload_response):
+    upload_response.status = 400
+    pool.request.return_value = upload_response
+    with pytest.raises(ApiException) as e:
+        upload_fileobj("some-url", fileobj, pool=pool)
+
+    assert e.value.status == 400
+
+
+@mock.patch(MODULE + ".upload_fileobj")
+def test_upload_file(upload_fileobj, tmp_path):
+    path = tmp_path / "myfile"
+    with path.open("wb") as f:
+        f.write(b"X")
+
+    upload_file(
+        "http://domain/a.b", path, chunk_size=1234, timeout=3.0, pool="foo", md5=b"abcd"
+    )
+
+    args, kwargs = upload_fileobj.call_args
+    assert args[0] == "http://domain/a.b"
+    assert isinstance(args[1], io.IOBase)
+    assert args[1].mode == "rb"
+    assert args[1].name == str(path)
+    assert kwargs["timeout"] == 3.0
+    assert kwargs["chunk_size"] == 1234
+    assert kwargs["pool"] == "foo"
+    assert kwargs["md5"] == b"abcd"
+
+
+def test_seekable_chunk_iterator():
+    data = b"XYZ"
+    body = _SeekableChunkIterator(io.BytesIO(data), chunk_size=4)
+
+    pos = set_file_position(body, pos=0)
+    assert pos == 0
+    assert list(body) == [data]
+    assert list(body) == []
+    set_file_position(body, pos)
+    assert list(body) == [data]