api_provider.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. import json as json_lib
  2. import re
  3. from http import HTTPStatus
  4. from typing import Callable
  5. from typing import Optional
  6. from urllib.parse import quote
  7. from urllib.parse import urlencode
  8. from urllib.parse import urljoin
  9. from pydantic import AnyHttpUrl
  10. from urllib3 import PoolManager
  11. from urllib3 import Retry
  12. from clean_python import ctx
  13. from clean_python import Json
  14. from .exceptions import ApiException
  15. __all__ = ["SyncApiProvider"]
  16. def is_success(status: HTTPStatus) -> bool:
  17. """Returns True on 2xx status"""
  18. return (int(status) // 100) == 2
  19. JSON_CONTENT_TYPE_REGEX = re.compile(r"^application\/[^+]*[+]?(json);?.*$")
  20. def is_json_content_type(content_type: Optional[str]) -> bool:
  21. if not content_type:
  22. return False
  23. return bool(JSON_CONTENT_TYPE_REGEX.match(content_type))
  24. def join(url: str, path: str) -> str:
  25. """Results in a full url without trailing slash"""
  26. assert url.endswith("/")
  27. assert not path.startswith("/")
  28. result = urljoin(url, path)
  29. if result.endswith("/"):
  30. result = result[:-1]
  31. return result
  32. def add_query_params(url: str, params: Optional[Json]) -> str:
  33. if params is None:
  34. return url
  35. return url + "?" + urlencode(params, doseq=True)
  36. class SyncApiProvider:
  37. """Basic JSON API provider with retry policy and bearer tokens.
  38. The default retry policy has 3 retries with 1, 2, 4 second intervals.
  39. Args:
  40. url: The url of the API (with trailing slash)
  41. fetch_token: Callable that returns a token for a tenant id
  42. retries: Total number of retries per request
  43. backoff_factor: Multiplier for retry delay times (1, 2, 4, ...)
  44. """
  45. def __init__(
  46. self,
  47. url: AnyHttpUrl,
  48. fetch_token: Callable[[PoolManager, int], Optional[str]],
  49. retries: int = 3,
  50. backoff_factor: float = 1.0,
  51. ):
  52. self._url = str(url)
  53. assert self._url.endswith("/")
  54. self._fetch_token = fetch_token
  55. self._pool = PoolManager(retries=Retry(retries, backoff_factor=backoff_factor))
  56. def request(
  57. self,
  58. method: str,
  59. path: str,
  60. params: Optional[Json] = None,
  61. json: Optional[Json] = None,
  62. fields: Optional[Json] = None,
  63. timeout: float = 5.0,
  64. ) -> Optional[Json]:
  65. assert ctx.tenant is not None
  66. headers = {}
  67. request_kwargs = {
  68. "method": method,
  69. "url": add_query_params(join(self._url, quote(path)), params),
  70. "timeout": timeout,
  71. }
  72. # for urllib3<2, we dump json ourselves
  73. if json is not None and fields is not None:
  74. raise ValueError("Cannot both specify 'json' and 'fields'")
  75. elif json is not None:
  76. request_kwargs["body"] = json_lib.dumps(json).encode()
  77. headers["Content-Type"] = "application/json"
  78. elif fields is not None:
  79. request_kwargs["fields"] = fields
  80. token = self._fetch_token(self._pool, ctx.tenant.id)
  81. if token is not None:
  82. headers["Authorization"] = f"Bearer {token}"
  83. response = self._pool.request(headers=headers, **request_kwargs)
  84. status = HTTPStatus(response.status)
  85. content_type = response.headers.get("Content-Type")
  86. if status is HTTPStatus.NO_CONTENT:
  87. return None
  88. if not is_json_content_type(content_type):
  89. raise ApiException(
  90. f"Unexpected content type '{content_type}'", status=status
  91. )
  92. body = json_lib.loads(response.data.decode())
  93. if is_success(status):
  94. return body
  95. else:
  96. raise ApiException(body, status=status)