|
|
@@ -1,5 +1,6 @@
|
|
|
from http import HTTPStatus
|
|
|
from typing import Callable
|
|
|
+from typing import Optional
|
|
|
from urllib.parse import urlencode
|
|
|
from urllib.parse import urljoin
|
|
|
|
|
|
@@ -30,7 +31,7 @@ def join(url: str, path: str) -> str:
|
|
|
return result
|
|
|
|
|
|
|
|
|
-def add_query_params(url: str, params: Json | None) -> str:
|
|
|
+def add_query_params(url: str, params: Optional[Json]) -> str:
|
|
|
if params is None:
|
|
|
return url
|
|
|
return url + "?" + urlencode(params, doseq=True)
|
|
|
@@ -51,7 +52,7 @@ class SyncApiProvider:
|
|
|
def __init__(
|
|
|
self,
|
|
|
url: AnyHttpUrl,
|
|
|
- fetch_token: Callable[[PoolManager, int], str | None],
|
|
|
+ fetch_token: Callable[[PoolManager, int], Optional[str]],
|
|
|
retries: int = 3,
|
|
|
backoff_factor: float = 1.0,
|
|
|
):
|
|
|
@@ -64,11 +65,11 @@ class SyncApiProvider:
|
|
|
self,
|
|
|
method: str,
|
|
|
path: str,
|
|
|
- params: Json | None = None,
|
|
|
- json: Json | None = None,
|
|
|
- fields: Json | None = None,
|
|
|
+ params: Optional[Json] = None,
|
|
|
+ json: Optional[Json] = None,
|
|
|
+ fields: Optional[Json] = None,
|
|
|
timeout: float = 5.0,
|
|
|
- ) -> Json | None:
|
|
|
+ ) -> Optional[Json]:
|
|
|
assert ctx.tenant is not None
|
|
|
url = join(self._url, path)
|
|
|
token = self._fetch_token(self._pool, ctx.tenant.id)
|