| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182 | from typing import Optionalfrom fastapi import Dependsfrom fastapi import Requestfrom fastapi.security import HTTPBearerfrom fastapi.security import OAuth2AuthorizationCodeBearerfrom clean_python import PermissionDeniedfrom clean_python.oauth2 import BaseTokenVerifierfrom clean_python.oauth2 import NoAuthTokenVerifierfrom clean_python.oauth2 import OAuth2SPAClientSettingsfrom clean_python.oauth2 import Tokenfrom clean_python.oauth2 import TokenVerifierfrom clean_python.oauth2 import TokenVerifierSettings__all__ = ["get_token", "RequiresScope"]verifier: Optional[BaseTokenVerifier] = Nonedef clear_verifier() -> None:    global verifier    verifier = Nonedef set_verifier(settings: Optional[TokenVerifierSettings]) -> None:    global verifier    if settings is None:        verifier = NoAuthTokenVerifier()    else:        verifier = TokenVerifier(settings=settings)def get_token(request: Request) -> Token:    """A fastapi 'dependable' yielding the validated token"""    global verifier    assert verifier is not None    return verifier(request.headers.get("Authorization"))class RequiresScope:    def __init__(self, scope: str):        assert scope.replace(" ", "") == scope, "spaces are not allowed in a scope"        self.scope = scope    async def __call__(self, token: Token = Depends(get_token)) -> None:        if self.scope not in token.scope:            raise PermissionDenied(f"this operation requires '{self.scope}' scope")class OAuth2SPAClientSchema(OAuth2AuthorizationCodeBearer):    """A fastapi 'dependable' configuring the openapi schema for the    OAuth2 Authorization Code Flow with PKCE extension.    This includes the JWT Bearer token configuration.    """    def __init__(self, client: OAuth2SPAClientSettings):        super().__init__(            scheme_name="OAuth2 Authorization Code Flow with PKCE",            authorizationUrl=str(client.authorization_url),            tokenUrl=str(client.token_url),        )    async def __call__(self) -> None:        passclass JWTBearerTokenSchema(HTTPBearer):    """A fastapi 'dependable' configuring the openapi schema for JWT Bearer tokens.    Note: for the client-side OAuth2 flow, use OAuth2SPAClientSchema instead.    """    def __init__(self):        super().__init__(scheme_name="JWT Bearer token", bearerFormat="JWT")    async def __call__(self) -> None:        pass
 |