sync_api_provider.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. import json as json_lib
  2. from http import HTTPStatus
  3. from typing import Callable
  4. from typing import Dict
  5. from typing import Optional
  6. from urllib.parse import quote
  7. from pydantic import AnyHttpUrl
  8. from urllib3 import PoolManager
  9. from urllib3 import Retry
  10. from clean_python import Json
  11. from .api_provider import add_query_params
  12. from .api_provider import check_exception
  13. from .api_provider import FileFormPost
  14. from .api_provider import is_json_content_type
  15. from .api_provider import join
  16. from .exceptions import ApiException
  17. from .response import Response
  18. __all__ = ["SyncApiProvider"]
  19. class SyncApiProvider:
  20. """Basic JSON API provider with retry policy and bearer tokens.
  21. The default retry policy has 3 retries with 1, 2, 4 second intervals.
  22. Args:
  23. url: The url of the API (with trailing slash)
  24. headers_factory: Callable that returns headers (for e.g. authorization)
  25. retries: Total number of retries per request
  26. backoff_factor: Multiplier for retry delay times (1, 2, 4, ...)
  27. trailing_slash: Wether to automatically add or remove trailing slashes.
  28. """
  29. def __init__(
  30. self,
  31. url: AnyHttpUrl,
  32. headers_factory: Optional[Callable[[], Dict[str, str]]] = None,
  33. retries: int = 3,
  34. backoff_factor: float = 1.0,
  35. trailing_slash: bool = False,
  36. ):
  37. self._url = str(url)
  38. if not self._url.endswith("/"):
  39. self._url += "/"
  40. self._headers_factory = headers_factory
  41. self._pool = PoolManager(retries=Retry(retries, backoff_factor=backoff_factor))
  42. self._trailing_slash = trailing_slash
  43. def _request(
  44. self,
  45. method: str,
  46. path: str,
  47. params: Optional[Json],
  48. json: Optional[Json],
  49. fields: Optional[Json],
  50. file: Optional[FileFormPost],
  51. timeout: float,
  52. ):
  53. headers = {}
  54. request_kwargs = {
  55. "method": method,
  56. "url": add_query_params(
  57. join(self._url, quote(path), self._trailing_slash), params
  58. ),
  59. "timeout": timeout,
  60. }
  61. # for urllib3<2, we dump json ourselves
  62. if json is not None and fields is not None:
  63. raise ValueError("Cannot both specify 'json' and 'fields'")
  64. elif json is not None and file is not None:
  65. raise ValueError("Cannot both specify 'json' and 'file'")
  66. elif json is not None:
  67. request_kwargs["body"] = json_lib.dumps(json).encode()
  68. headers["Content-Type"] = "application/json"
  69. elif fields is not None and file is None:
  70. request_kwargs["fields"] = fields
  71. request_kwargs["encode_multipart"] = False
  72. elif file is not None:
  73. request_kwargs["fields"] = {
  74. file.field_name: (
  75. file.file_name,
  76. file.file.read(),
  77. file.content_type,
  78. ),
  79. **(fields or {}),
  80. }
  81. request_kwargs["encode_multipart"] = True
  82. if self._headers_factory is not None:
  83. headers.update(self._headers_factory())
  84. return self._pool.request(headers=headers, **request_kwargs)
  85. def request(
  86. self,
  87. method: str,
  88. path: str,
  89. params: Optional[Json] = None,
  90. json: Optional[Json] = None,
  91. fields: Optional[Json] = None,
  92. file: Optional[FileFormPost] = None,
  93. timeout: float = 5.0,
  94. ) -> Optional[Json]:
  95. response = self._request(method, path, params, json, fields, file, timeout)
  96. status = HTTPStatus(response.status)
  97. content_type = response.headers.get("Content-Type")
  98. if status is HTTPStatus.NO_CONTENT:
  99. return None
  100. if not is_json_content_type(content_type):
  101. raise ApiException(
  102. f"Unexpected content type '{content_type}'", status=status
  103. )
  104. body = json_lib.loads(response.data.decode())
  105. check_exception(status, body)
  106. return body
  107. def request_raw(
  108. self,
  109. method: str,
  110. path: str,
  111. params: Optional[Json] = None,
  112. json: Optional[Json] = None,
  113. fields: Optional[Json] = None,
  114. file: Optional[FileFormPost] = None,
  115. timeout: float = 5.0,
  116. ) -> Response:
  117. response = self._request(method, path, params, json, fields, file, timeout)
  118. return Response(
  119. status=response.status,
  120. data=response.data,
  121. content_type=response.headers.get("Content-Type"),
  122. )