test_verifier.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. # (c) Nelen & Schuurmans
  2. import json
  3. import socket
  4. import time
  5. import urllib.request
  6. from io import BytesIO
  7. from unittest import mock
  8. import jwt
  9. import pytest
  10. from clean_python import PermissionDenied
  11. from clean_python import Unauthorized
  12. from clean_python.oauth2 import Token
  13. from clean_python.oauth2 import TokenVerifier
  14. @pytest.fixture
  15. def patched_verifier(settings, jwk_patched):
  16. return TokenVerifier(settings)
  17. def test_verifier_ok(patched_verifier, token_generator, jwk_patched):
  18. token = token_generator()
  19. verified_token = patched_verifier("Bearer " + token)
  20. assert isinstance(verified_token, Token)
  21. assert verified_token.user.id == "foo"
  22. assert verified_token.tenant is None
  23. assert verified_token.scope == {"user"}
  24. jwk_patched.assert_called_once_with(
  25. "https://some/auth/server/.well-known/jwks.json"
  26. )
  27. def test_verifier_exp_leeway(patched_verifier, token_generator):
  28. token = token_generator(exp=int(time.time()) - 60)
  29. patched_verifier("Bearer " + token)
  30. def test_verifier_multiple_scopes(patched_verifier, token_generator, settings):
  31. token = token_generator(scope=f"scope1 {settings.scope} scope3")
  32. patched_verifier("Bearer " + token)
  33. @pytest.mark.parametrize(
  34. "claim_overrides",
  35. [
  36. {"iss": "https://authserver"},
  37. {"iss": None},
  38. {"scope": "nothing"},
  39. {"scope": None},
  40. {"exp": int(time.time()) - 3600},
  41. {"exp": None},
  42. {"nbf": int(time.time()) + 3600},
  43. {"token_use": "id"},
  44. {"token_use": None},
  45. {"sub": None},
  46. {"username": None},
  47. ],
  48. )
  49. def test_verifier_bad(patched_verifier, token_generator, claim_overrides):
  50. token = token_generator(**claim_overrides)
  51. with pytest.raises(Unauthorized):
  52. patched_verifier("Bearer " + token)
  53. def test_verifier_authorize(patched_verifier, token_generator):
  54. token = token_generator(sub="bar")
  55. with pytest.raises(PermissionDenied):
  56. patched_verifier("Bearer " + token)
  57. @pytest.mark.parametrize("prefix", ["", "foo ", "key ", "bearer ", "Bearer "])
  58. def test_verifier_bad_header_prefix(patched_verifier, token_generator, prefix):
  59. token = token_generator()
  60. with pytest.raises(Unauthorized):
  61. patched_verifier(prefix + token)
  62. @pytest.mark.parametrize("header", ["", None, " "])
  63. def test_verifier_no_header(patched_verifier, header):
  64. with pytest.raises(Unauthorized):
  65. patched_verifier(header)
  66. @mock.patch.object(urllib.request, "urlopen")
  67. def test_get_key_timeout(urlopen, patched_verifier, token_generator, public_key):
  68. def side_effect():
  69. assert socket.getdefaulttimeout() == 0.1
  70. return BytesIO(json.dumps({"keys": [public_key]}).encode())
  71. urlopen.return_value.__enter__.side_effect = side_effect
  72. assert socket.getdefaulttimeout() is None
  73. key = patched_verifier.get_key(token_generator(), timeout=0.1)
  74. assert socket.getdefaulttimeout() is None
  75. assert isinstance(key, jwt.PyJWK)
  76. assert key.key_id == public_key["kid"]
  77. @mock.patch.object(urllib.request, "urlopen")
  78. def test_get_key_invalid_kid(urlopen, settings, token_generator, public_key):
  79. urlopen.return_value.__enter__.return_value = BytesIO(
  80. json.dumps({"keys": []}).encode()
  81. )
  82. with pytest.raises(jwt.exceptions.PyJWTError):
  83. TokenVerifier(settings).get_key(token_generator())