| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 | import json as json_libfrom http import HTTPStatusfrom typing import Callablefrom typing import Dictfrom typing import Optionalfrom urllib.parse import quotefrom pydantic import AnyHttpUrlfrom urllib3 import PoolManagerfrom urllib3 import Retryfrom clean_python import Jsonfrom .api_provider import add_query_paramsfrom .api_provider import check_exceptionfrom .api_provider import FileFormPostfrom .api_provider import is_json_content_typefrom .api_provider import joinfrom .exceptions import ApiExceptionfrom .response import Response__all__ = ["SyncApiProvider"]class SyncApiProvider:    """Basic JSON API provider with retry policy and bearer tokens.    The default retry policy has 3 retries with 1, 2, 4 second intervals.    Args:        url: The url of the API (with trailing slash)        headers_factory: Callable that returns headers (for e.g. authorization)        retries: Total number of retries per request        backoff_factor: Multiplier for retry delay times (1, 2, 4, ...)        trailing_slash: Wether to automatically add or remove trailing slashes.    """    def __init__(        self,        url: AnyHttpUrl,        headers_factory: Optional[Callable[[], Dict[str, str]]] = None,        retries: int = 3,        backoff_factor: float = 1.0,        trailing_slash: bool = False,    ):        self._url = str(url)        if not self._url.endswith("/"):            self._url += "/"        self._headers_factory = headers_factory        self._pool = PoolManager(retries=Retry(retries, backoff_factor=backoff_factor))        self._trailing_slash = trailing_slash    def _request(        self,        method: str,        path: str,        params: Optional[Json],        json: Optional[Json],        fields: Optional[Json],        file: Optional[FileFormPost],        headers: Optional[Dict[str, str]],        timeout: float,    ):        actual_headers = {}        if self._headers_factory is not None:            actual_headers.update(self._headers_factory())        if headers:            actual_headers.update(headers)        request_kwargs = {            "method": method,            "url": add_query_params(                join(self._url, quote(path), self._trailing_slash), params            ),            "timeout": timeout,        }        # for urllib3<2, we dump json ourselves        if json is not None and fields is not None:            raise ValueError("Cannot both specify 'json' and 'fields'")        elif json is not None and file is not None:            raise ValueError("Cannot both specify 'json' and 'file'")        elif json is not None:            request_kwargs["body"] = json_lib.dumps(json).encode()            actual_headers["Content-Type"] = "application/json"        elif fields is not None and file is None:            request_kwargs["fields"] = fields            request_kwargs["encode_multipart"] = False        elif file is not None:            request_kwargs["fields"] = {                file.field_name: (                    file.file_name,                    file.file.read(),                    file.content_type,                ),                **(fields or {}),            }            request_kwargs["encode_multipart"] = True        return self._pool.request(headers=actual_headers, **request_kwargs)    def request(        self,        method: str,        path: str,        params: Optional[Json] = None,        json: Optional[Json] = None,        fields: Optional[Json] = None,        file: Optional[FileFormPost] = None,        headers: Optional[Dict[str, str]] = None,        timeout: float = 5.0,    ) -> Optional[Json]:        response = self._request(            method, path, params, json, fields, file, headers, timeout        )        status = HTTPStatus(response.status)        content_type = response.headers.get("Content-Type")        if status is HTTPStatus.NO_CONTENT:            return None        if not is_json_content_type(content_type):            raise ApiException(                f"Unexpected content type '{content_type}'", status=status            )        body = json_lib.loads(response.data.decode())        check_exception(status, body)        return body    def request_raw(        self,        method: str,        path: str,        params: Optional[Json] = None,        json: Optional[Json] = None,        fields: Optional[Json] = None,        file: Optional[FileFormPost] = None,        headers: Optional[Dict[str, str]] = None,        timeout: float = 5.0,    ) -> Response:        response = self._request(            method, path, params, json, fields, file, headers, timeout        )        return Response(            status=response.status,            data=response.data,            content_type=response.headers.get("Content-Type"),        )
 |