Browse Source

Add User to Claims (#5)

Casper van der Wel 1 year ago
parent
commit
08c2dfa318
4 changed files with 26 additions and 9 deletions
  1. 1 1
      CHANGES.md
  2. 7 1
      clean_python/oauth2/claims.py
  3. 11 7
      clean_python/oauth2/oauth2.py
  4. 7 0
      tests/oauth2/test_oauth2.py

+ 1 - 1
CHANGES.md

@@ -4,7 +4,7 @@
 0.2.2 (unreleased)
 ------------------
 
-- Nothing changed yet.
+- Expand ctx.claims with user details.
 
 
 0.2.1 (2023-08-03)

+ 7 - 1
clean_python/oauth2/claims.py

@@ -11,6 +11,12 @@ class Tenant(ValueObject):
     name: str
 
 
+class User(ValueObject):
+    id: str
+    name: Optional[str]
+
+
 class Claims(ValueObject):
-    scope: FrozenSet[str]
+    user: User
     tenant: Optional[Tenant]
+    scope: FrozenSet[str]

+ 11 - 7
clean_python/oauth2/oauth2.py

@@ -16,6 +16,7 @@ from clean_python import Unauthorized
 
 from .claims import Claims
 from .claims import Tenant
+from .claims import User
 
 __all__ = ["TokenVerifier", "TokenVerifierSettings", "OAuth2SPAClientSettings"]
 
@@ -84,13 +85,14 @@ class TokenVerifier:
             raise Unauthorized()
         # Step 3: Verify additional claims. At this point, we have passed
         # verification, so unverified claims may be used safely.
-        scope = self.parse_scope(claims)
+        user = self.parse_user(claims)
         tenant = self.parse_tenant(claims)
+        scope = self.parse_scope(claims)
         self.verify_token_use(claims)
         self.verify_scope(scope)
-        # Step 4: Authorization: verify 'sub' claim against 'admin_users'
-        self.verify_sub(claims)
-        return Claims(scope=scope, tenant=tenant)
+        # Step 4: Authorization: verify user id ('sub' claim) against 'admin_users'
+        self.authorize_user(user)
+        return Claims(user=user, scope=scope, tenant=tenant)
 
     def get_key(self, token) -> jwt.PyJWK:
         """Return the JSON Web KEY (JWK) corresponding to kid."""
@@ -110,14 +112,16 @@ class TokenVerifier:
             # logger.info("Token has invalid scope claim: %s", claims["scope"])
             raise Unauthorized()
 
-    def verify_sub(self, claims: Dict) -> None:
-        """The subject (sub) claim should be in a hard-coded whitelist."""
+    def authorize_user(self, user: User) -> None:
         if self.settings.admin_users is None:
             return
-        if claims.get("sub") not in self.settings.admin_users:
+        if user.id not in self.settings.admin_users:
             # logger.info("User with sub %s is not authorized", claims.get("sub"))
             raise PermissionDenied()
 
+    def parse_user(self, claims: Dict) -> User:
+        return User(id=claims["sub"], name=claims.get("username"))
+
     def parse_scope(self, claims: Dict) -> FrozenSet[str]:
         return frozenset(claims["scope"].split(" "))
 

+ 7 - 0
tests/oauth2/test_oauth2.py

@@ -17,12 +17,19 @@ def patched_verifier(jwk_patched, settings):
 def test_verifier_ok(patched_verifier, token_generator):
     token = token_generator()
     verified_claims = patched_verifier("Bearer " + token)
+    assert verified_claims.user.id == "foo"
     assert verified_claims.tenant is None
     assert verified_claims.scope == {"user"}
 
     patched_verifier.get_key.assert_called_once_with(token)
 
 
+def test_verifier_ok_with_username(patched_verifier, token_generator):
+    token = token_generator(username="sinterklaas")
+    verified_claims = patched_verifier("Bearer " + token)
+    assert verified_claims.user.name == "sinterklaas"
+
+
 def test_verifier_ok_with_tenant(patched_verifier, token_generator):
     token = token_generator(tenant="15")
     verified_claims = patched_verifier("Bearer " + token)