|
@@ -1,6 +1,7 @@
|
|
# (c) Nelen & Schuurmans
|
|
# (c) Nelen & Schuurmans
|
|
|
|
|
|
import logging
|
|
import logging
|
|
|
|
+from typing import Any
|
|
from typing import Dict
|
|
from typing import Dict
|
|
from typing import FrozenSet
|
|
from typing import FrozenSet
|
|
from typing import List
|
|
from typing import List
|
|
@@ -90,22 +91,22 @@ class TokenVerifier(BaseTokenVerifier):
|
|
if authorization is None:
|
|
if authorization is None:
|
|
logger.info("Missing Authorization header")
|
|
logger.info("Missing Authorization header")
|
|
raise Unauthorized()
|
|
raise Unauthorized()
|
|
- token = authorization[7:] if authorization.startswith("Bearer") else None
|
|
|
|
- if token is None:
|
|
|
|
|
|
+ jwt_str = authorization[7:] if authorization.startswith("Bearer") else None
|
|
|
|
+ if jwt_str is None:
|
|
logger.info("Authorization does not start with 'Bearer '")
|
|
logger.info("Authorization does not start with 'Bearer '")
|
|
raise Unauthorized()
|
|
raise Unauthorized()
|
|
|
|
|
|
# Step 1: Confirm the structure of the JWT. This check is part of get_kid since
|
|
# Step 1: Confirm the structure of the JWT. This check is part of get_kid since
|
|
# jwt.get_unverified_header will raise a JWTError if the structure is wrong.
|
|
# jwt.get_unverified_header will raise a JWTError if the structure is wrong.
|
|
try:
|
|
try:
|
|
- key = self.get_key(token) # JSON Web Key
|
|
|
|
|
|
+ key = self.get_key(jwt_str) # JSON Web Key
|
|
except PyJWTError as e:
|
|
except PyJWTError as e:
|
|
logger.info("Token is invalid: %s", e)
|
|
logger.info("Token is invalid: %s", e)
|
|
raise Unauthorized()
|
|
raise Unauthorized()
|
|
# Step 2: Validate the JWT signature and standard claims
|
|
# Step 2: Validate the JWT signature and standard claims
|
|
try:
|
|
try:
|
|
claims = jwt.decode(
|
|
claims = jwt.decode(
|
|
- token,
|
|
|
|
|
|
+ jwt_str,
|
|
key.key,
|
|
key.key,
|
|
algorithms=self.settings.algorithms,
|
|
algorithms=self.settings.algorithms,
|
|
issuer=self.settings.issuer,
|
|
issuer=self.settings.issuer,
|
|
@@ -134,7 +135,7 @@ class TokenVerifier(BaseTokenVerifier):
|
|
"""Return the JSON Web KEY (JWK) corresponding to kid."""
|
|
"""Return the JSON Web KEY (JWK) corresponding to kid."""
|
|
return self.jwk_client.get_signing_key_from_jwt(token)
|
|
return self.jwk_client.get_signing_key_from_jwt(token)
|
|
|
|
|
|
- def verify_token_use(self, claims: Dict) -> None:
|
|
|
|
|
|
+ def verify_token_use(self, claims: Dict[str, Any]) -> None:
|
|
"""Check the token_use claim."""
|
|
"""Check the token_use claim."""
|
|
if claims["token_use"] != "access":
|
|
if claims["token_use"] != "access":
|
|
logger.info("Token has invalid token_use claim: %s", claims["token_use"])
|
|
logger.info("Token has invalid token_use claim: %s", claims["token_use"])
|