sync_api_provider.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. import json as json_lib
  2. from http import HTTPStatus
  3. from typing import Callable
  4. from typing import Dict
  5. from typing import Optional
  6. from urllib.parse import quote
  7. from pydantic import AnyHttpUrl
  8. from urllib3 import PoolManager
  9. from urllib3 import Retry
  10. from clean_python import Json
  11. from .api_provider import add_query_params
  12. from .api_provider import check_exception
  13. from .api_provider import FileFormPost
  14. from .api_provider import is_json_content_type
  15. from .api_provider import join
  16. from .exceptions import ApiException
  17. from .response import Response
  18. __all__ = ["SyncApiProvider"]
  19. class SyncApiProvider:
  20. """Basic JSON API provider with retry policy and bearer tokens.
  21. The default retry policy has 3 retries with 1, 2, 4 second intervals.
  22. Args:
  23. url: The url of the API (with trailing slash)
  24. fetch_token: Callable that returns a token for a tenant id
  25. retries: Total number of retries per request
  26. backoff_factor: Multiplier for retry delay times (1, 2, 4, ...)
  27. """
  28. def __init__(
  29. self,
  30. url: AnyHttpUrl,
  31. fetch_token: Callable[[], Dict[str, str]],
  32. retries: int = 3,
  33. backoff_factor: float = 1.0,
  34. trailing_slash: bool = False,
  35. ):
  36. self._url = str(url)
  37. if not self._url.endswith("/"):
  38. self._url += "/"
  39. self._fetch_token = fetch_token
  40. self._pool = PoolManager(retries=Retry(retries, backoff_factor=backoff_factor))
  41. self._trailing_slash = trailing_slash
  42. def _request(
  43. self,
  44. method: str,
  45. path: str,
  46. params: Optional[Json],
  47. json: Optional[Json],
  48. fields: Optional[Json],
  49. file: Optional[FileFormPost],
  50. timeout: float,
  51. ):
  52. headers = {}
  53. request_kwargs = {
  54. "method": method,
  55. "url": add_query_params(
  56. join(self._url, quote(path), self._trailing_slash), params
  57. ),
  58. "timeout": timeout,
  59. }
  60. # for urllib3<2, we dump json ourselves
  61. if json is not None and fields is not None:
  62. raise ValueError("Cannot both specify 'json' and 'fields'")
  63. elif json is not None and file is not None:
  64. raise ValueError("Cannot both specify 'json' and 'file'")
  65. elif json is not None:
  66. request_kwargs["body"] = json_lib.dumps(json).encode()
  67. headers["Content-Type"] = "application/json"
  68. elif fields is not None and file is None:
  69. request_kwargs["fields"] = fields
  70. request_kwargs["encode_multipart"] = False
  71. elif file is not None:
  72. request_kwargs["fields"] = {
  73. file.field_name: (
  74. file.file_name,
  75. file.file.read(),
  76. file.content_type,
  77. ),
  78. **(fields or {}),
  79. }
  80. request_kwargs["encode_multipart"] = True
  81. headers.update(self._fetch_token())
  82. return self._pool.request(headers=headers, **request_kwargs)
  83. def request(
  84. self,
  85. method: str,
  86. path: str,
  87. params: Optional[Json] = None,
  88. json: Optional[Json] = None,
  89. fields: Optional[Json] = None,
  90. file: Optional[FileFormPost] = None,
  91. timeout: float = 5.0,
  92. ) -> Optional[Json]:
  93. response = self._request(method, path, params, json, fields, file, timeout)
  94. status = HTTPStatus(response.status)
  95. content_type = response.headers.get("Content-Type")
  96. if status is HTTPStatus.NO_CONTENT:
  97. return None
  98. if not is_json_content_type(content_type):
  99. raise ApiException(
  100. f"Unexpected content type '{content_type}'", status=status
  101. )
  102. body = json_lib.loads(response.data.decode())
  103. check_exception(status, body)
  104. return body
  105. def request_raw(
  106. self,
  107. method: str,
  108. path: str,
  109. params: Optional[Json] = None,
  110. json: Optional[Json] = None,
  111. fields: Optional[Json] = None,
  112. file: Optional[FileFormPost] = None,
  113. timeout: float = 5.0,
  114. ) -> Response:
  115. response = self._request(method, path, params, json, fields, file, timeout)
  116. return Response(
  117. status=response.status,
  118. data=response.data,
  119. content_type=response.headers.get("Content-Type"),
  120. )