sync_api_provider.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. import json as json_lib
  2. from http import HTTPStatus
  3. from typing import Callable
  4. from typing import Dict
  5. from typing import Optional
  6. from urllib.parse import quote
  7. from pydantic import AnyHttpUrl
  8. from urllib3 import PoolManager
  9. from urllib3 import Retry
  10. from clean_python import Json
  11. from .api_provider import add_query_params
  12. from .api_provider import check_exception
  13. from .api_provider import FileFormPost
  14. from .api_provider import is_json_content_type
  15. from .api_provider import join
  16. from .api_provider import RETRY_METHODS
  17. from .api_provider import RETRY_STATUSES
  18. from .exceptions import ApiException
  19. from .response import Response
  20. __all__ = ["SyncApiProvider"]
  21. class SyncApiProvider:
  22. """Basic JSON API provider with retry policy and bearer tokens.
  23. The default retry policy has 3 retries with 1, 2, 4 second intervals.
  24. Args:
  25. url: The url of the API (with trailing slash)
  26. headers_factory: Callable that returns headers (for e.g. authorization)
  27. retries: Total number of retries per request
  28. backoff_factor: Multiplier for retry delay times (1, 2, 4, ...)
  29. trailing_slash: Wether to automatically add or remove trailing slashes.
  30. """
  31. def __init__(
  32. self,
  33. url: AnyHttpUrl,
  34. headers_factory: Optional[Callable[[], Dict[str, str]]] = None,
  35. retries: int = 3,
  36. backoff_factor: float = 1.0,
  37. trailing_slash: bool = False,
  38. ):
  39. self._url = str(url)
  40. if not self._url.endswith("/"):
  41. self._url += "/"
  42. self._headers_factory = headers_factory
  43. self._pool = PoolManager(
  44. retries=Retry(
  45. retries,
  46. backoff_factor=backoff_factor,
  47. status_forcelist=RETRY_STATUSES,
  48. allowed_methods=RETRY_METHODS,
  49. )
  50. )
  51. self._trailing_slash = trailing_slash
  52. def _request(
  53. self,
  54. method: str,
  55. path: str,
  56. params: Optional[Json],
  57. json: Optional[Json],
  58. fields: Optional[Json],
  59. file: Optional[FileFormPost],
  60. headers: Optional[Dict[str, str]],
  61. timeout: float,
  62. ):
  63. actual_headers = {}
  64. if self._headers_factory is not None:
  65. actual_headers.update(self._headers_factory())
  66. if headers:
  67. actual_headers.update(headers)
  68. request_kwargs = {
  69. "method": method,
  70. "url": add_query_params(
  71. join(self._url, quote(path), self._trailing_slash), params
  72. ),
  73. "timeout": timeout,
  74. }
  75. # for urllib3<2, we dump json ourselves
  76. if json is not None and fields is not None:
  77. raise ValueError("Cannot both specify 'json' and 'fields'")
  78. elif json is not None and file is not None:
  79. raise ValueError("Cannot both specify 'json' and 'file'")
  80. elif json is not None:
  81. request_kwargs["body"] = json_lib.dumps(json).encode()
  82. actual_headers["Content-Type"] = "application/json"
  83. elif fields is not None and file is None:
  84. request_kwargs["fields"] = fields
  85. request_kwargs["encode_multipart"] = False
  86. elif file is not None:
  87. request_kwargs["fields"] = {
  88. file.field_name: (
  89. file.file_name,
  90. file.file.read(),
  91. file.content_type,
  92. ),
  93. **(fields or {}),
  94. }
  95. request_kwargs["encode_multipart"] = True
  96. return self._pool.request(headers=actual_headers, **request_kwargs)
  97. def request(
  98. self,
  99. method: str,
  100. path: str,
  101. params: Optional[Json] = None,
  102. json: Optional[Json] = None,
  103. fields: Optional[Json] = None,
  104. file: Optional[FileFormPost] = None,
  105. headers: Optional[Dict[str, str]] = None,
  106. timeout: float = 5.0,
  107. ) -> Optional[Json]:
  108. response = self._request(
  109. method, path, params, json, fields, file, headers, timeout
  110. )
  111. status = HTTPStatus(response.status)
  112. content_type = response.headers.get("Content-Type")
  113. if status is HTTPStatus.NO_CONTENT:
  114. return None
  115. if not is_json_content_type(content_type):
  116. raise ApiException(
  117. f"Unexpected content type '{content_type}'", status=status
  118. )
  119. body = json_lib.loads(response.data.decode())
  120. check_exception(status, body)
  121. return body
  122. def request_raw(
  123. self,
  124. method: str,
  125. path: str,
  126. params: Optional[Json] = None,
  127. json: Optional[Json] = None,
  128. fields: Optional[Json] = None,
  129. file: Optional[FileFormPost] = None,
  130. headers: Optional[Dict[str, str]] = None,
  131. timeout: float = 5.0,
  132. ) -> Response:
  133. response = self._request(
  134. method, path, params, json, fields, file, headers, timeout
  135. )
  136. return Response(
  137. status=response.status,
  138. data=response.data,
  139. content_type=response.headers.get("Content-Type"),
  140. )