123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- import json as json_lib
- from http import HTTPStatus
- from typing import Callable
- from typing import Dict
- from typing import Optional
- from urllib.parse import quote
- from pydantic import AnyHttpUrl
- from urllib3 import PoolManager
- from urllib3 import Retry
- from clean_python import Json
- from .api_provider import add_query_params
- from .api_provider import check_exception
- from .api_provider import FileFormPost
- from .api_provider import is_json_content_type
- from .api_provider import join
- from .api_provider import RETRY_METHODS
- from .api_provider import RETRY_STATUSES
- from .exceptions import ApiException
- from .response import Response
- __all__ = ["SyncApiProvider"]
- class SyncApiProvider:
- """Basic JSON API provider with retry policy and bearer tokens.
- The default retry policy has 3 retries with 1, 2, 4 second intervals.
- Args:
- url: The url of the API (with trailing slash)
- headers_factory: Callable that returns headers (for e.g. authorization)
- retries: Total number of retries per request
- backoff_factor: Multiplier for retry delay times (1, 2, 4, ...)
- trailing_slash: Wether to automatically add or remove trailing slashes.
- """
- def __init__(
- self,
- url: AnyHttpUrl,
- headers_factory: Optional[Callable[[], Dict[str, str]]] = None,
- retries: int = 3,
- backoff_factor: float = 1.0,
- trailing_slash: bool = False,
- ):
- self._url = str(url)
- if not self._url.endswith("/"):
- self._url += "/"
- self._headers_factory = headers_factory
- self._pool = PoolManager(
- retries=Retry(
- retries,
- backoff_factor=backoff_factor,
- status_forcelist=RETRY_STATUSES,
- allowed_methods=RETRY_METHODS,
- )
- )
- self._trailing_slash = trailing_slash
- def _request(
- self,
- method: str,
- path: str,
- params: Optional[Json],
- json: Optional[Json],
- fields: Optional[Json],
- file: Optional[FileFormPost],
- headers: Optional[Dict[str, str]],
- timeout: float,
- ):
- actual_headers = {}
- if self._headers_factory is not None:
- actual_headers.update(self._headers_factory())
- if headers:
- actual_headers.update(headers)
- request_kwargs = {
- "method": method,
- "url": add_query_params(
- join(self._url, quote(path), self._trailing_slash), params
- ),
- "timeout": timeout,
- }
- # for urllib3<2, we dump json ourselves
- if json is not None and fields is not None:
- raise ValueError("Cannot both specify 'json' and 'fields'")
- elif json is not None and file is not None:
- raise ValueError("Cannot both specify 'json' and 'file'")
- elif json is not None:
- request_kwargs["body"] = json_lib.dumps(json).encode()
- actual_headers["Content-Type"] = "application/json"
- elif fields is not None and file is None:
- request_kwargs["fields"] = fields
- request_kwargs["encode_multipart"] = False
- elif file is not None:
- request_kwargs["fields"] = {
- file.field_name: (
- file.file_name,
- file.file.read(),
- file.content_type,
- ),
- **(fields or {}),
- }
- request_kwargs["encode_multipart"] = True
- return self._pool.request(headers=actual_headers, **request_kwargs)
- def request(
- self,
- method: str,
- path: str,
- params: Optional[Json] = None,
- json: Optional[Json] = None,
- fields: Optional[Json] = None,
- file: Optional[FileFormPost] = None,
- headers: Optional[Dict[str, str]] = None,
- timeout: float = 5.0,
- ) -> Optional[Json]:
- response = self._request(
- method, path, params, json, fields, file, headers, timeout
- )
- status = HTTPStatus(response.status)
- content_type = response.headers.get("Content-Type")
- if status is HTTPStatus.NO_CONTENT:
- return None
- if not is_json_content_type(content_type):
- raise ApiException(
- f"Unexpected content type '{content_type}'", status=status
- )
- body = json_lib.loads(response.data.decode())
- check_exception(status, body)
- return body
- def request_raw(
- self,
- method: str,
- path: str,
- params: Optional[Json] = None,
- json: Optional[Json] = None,
- fields: Optional[Json] = None,
- file: Optional[FileFormPost] = None,
- headers: Optional[Dict[str, str]] = None,
- timeout: float = 5.0,
- ) -> Response:
- response = self._request(
- method, path, params, json, fields, file, headers, timeout
- )
- return Response(
- status=response.status,
- data=response.data,
- content_type=response.headers.get("Content-Type"),
- )
|