sync_api_provider.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. import json as json_lib
  2. from http import HTTPStatus
  3. from typing import Callable
  4. from typing import Optional
  5. from urllib.parse import quote
  6. from pydantic import AnyHttpUrl
  7. from urllib3 import PoolManager
  8. from urllib3 import Retry
  9. from clean_python import ctx
  10. from clean_python import Json
  11. from .api_provider import add_query_params
  12. from .api_provider import is_json_content_type
  13. from .api_provider import is_success
  14. from .api_provider import join
  15. from .exceptions import ApiException
  16. from .response import Response
  17. __all__ = ["SyncApiProvider"]
  18. class SyncApiProvider:
  19. """Basic JSON API provider with retry policy and bearer tokens.
  20. The default retry policy has 3 retries with 1, 2, 4 second intervals.
  21. Args:
  22. url: The url of the API (with trailing slash)
  23. fetch_token: Callable that returns a token for a tenant id
  24. retries: Total number of retries per request
  25. backoff_factor: Multiplier for retry delay times (1, 2, 4, ...)
  26. """
  27. def __init__(
  28. self,
  29. url: AnyHttpUrl,
  30. fetch_token: Callable[[PoolManager, int], Optional[str]],
  31. retries: int = 3,
  32. backoff_factor: float = 1.0,
  33. ):
  34. self._url = str(url)
  35. assert self._url.endswith("/")
  36. self._fetch_token = fetch_token
  37. self._pool = PoolManager(retries=Retry(retries, backoff_factor=backoff_factor))
  38. def _request(
  39. self,
  40. method: str,
  41. path: str,
  42. params: Optional[Json],
  43. json: Optional[Json],
  44. fields: Optional[Json],
  45. timeout: float,
  46. ):
  47. assert ctx.tenant is not None
  48. headers = {}
  49. request_kwargs = {
  50. "method": method,
  51. "url": add_query_params(join(self._url, quote(path)), params),
  52. "timeout": timeout,
  53. }
  54. # for urllib3<2, we dump json ourselves
  55. if json is not None and fields is not None:
  56. raise ValueError("Cannot both specify 'json' and 'fields'")
  57. elif json is not None:
  58. request_kwargs["body"] = json_lib.dumps(json).encode()
  59. headers["Content-Type"] = "application/json"
  60. elif fields is not None:
  61. request_kwargs["fields"] = fields
  62. token = self._fetch_token(self._pool, ctx.tenant.id)
  63. if token is not None:
  64. headers["Authorization"] = f"Bearer {token}"
  65. return self._pool.request(headers=headers, **request_kwargs)
  66. def request(
  67. self,
  68. method: str,
  69. path: str,
  70. params: Optional[Json] = None,
  71. json: Optional[Json] = None,
  72. fields: Optional[Json] = None,
  73. timeout: float = 5.0,
  74. ) -> Optional[Json]:
  75. response = self._request(method, path, params, json, fields, timeout)
  76. status = HTTPStatus(response.status)
  77. content_type = response.headers.get("Content-Type")
  78. if status is HTTPStatus.NO_CONTENT:
  79. return None
  80. if not is_json_content_type(content_type):
  81. raise ApiException(
  82. f"Unexpected content type '{content_type}'", status=status
  83. )
  84. body = json_lib.loads(response.data.decode())
  85. if is_success(status):
  86. return body
  87. else:
  88. raise ApiException(body, status=status)
  89. def request_raw(
  90. self,
  91. method: str,
  92. path: str,
  93. params: Optional[Json] = None,
  94. json: Optional[Json] = None,
  95. fields: Optional[Json] = None,
  96. timeout: float = 5.0,
  97. ) -> Response:
  98. response = self._request(method, path, params, json, fields, timeout)
  99. return Response(
  100. status=response.status,
  101. data=response.data,
  102. content_type=response.headers.get("Content-Type"),
  103. )