test_oauth2.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. # (c) Nelen & Schuurmans
  2. import time
  3. from unittest import mock
  4. import jwt
  5. import pytest
  6. from clean_python import PermissionDenied
  7. from clean_python import Unauthorized
  8. from clean_python.oauth2 import OAuth2AccessTokenVerifier
  9. @pytest.fixture
  10. def settings():
  11. return {
  12. "issuer": "https://cognito-idp.region.amazonaws.com/region_abc123",
  13. "resource_server_id": "localhost/",
  14. "algorithms": ["RS256"],
  15. "admin_users": ["foo"],
  16. }
  17. @pytest.fixture
  18. def private_key():
  19. # this key was generated especially for this test suite; is has no other applications
  20. return {
  21. "p": "_PgJBxrGEy8I5KvY_nDRT9loaBPqHHn0AUiTa92zBrAX0qA8ZhV66pUkX2JehU3efduel4FOK2xx-W31p7kCLoaGsMtfKAPYC33KptCH9YXkeMQHq1jWfcRgAVXpdXc7M4pQxO8Dh2BU8qhtAzhpbP4tUPoLIGcTUGd-1ieDkqE", # NOQA
  22. "kty": "RSA",
  23. "q": "hT0USPCNN4o2PauND53ubh2G5uOHzY9mfPuEXZ1fiRihCe5Bng0K8Pzx5QpSAjUY2-FhHa8jK8ITERmwT3MQKJpmlm_1R8GnaNVPOj8BpAhDlMzgkVikEGj0Pd7x_wdSko7KscyG-ZVsMw_KiCZpC6hMiI60w9GG14MtXhRVWhM", # NOQA
  24. "d": "BNwTHorPcAMiDglxt5Ylz1jqQ67rYcnA0okvZxz0QPbLovuTM1WIaPIeGlqXNzB9NxXtZhHXtnhoSwPf2LxMmYWWgJLqhPQWRlqZhLhww0nGGUgk_b1gNnMQuuh2weLfPNUksddhDJHzW1pBiDQrhP0t064Pz_P8WtGUkBka5-Pb3pItaF_w4xDIhhTJS48kv5H-BrwK8Vlz-EofkmPgxXBvCwhVoXZihxEUVzc6X59e1UiymXr-3lbNeL-76Yb9JHJFjXh2o52v5eZDVT6ir-iUp7bBXTiZsFaBCUCfCjx3MiQkHNBNEV7Cr9DKvfGdK3r9IbkSAC1tiD4Y1oyZwQ", # NOQA
  25. "e": "AQAB",
  26. "use": "sig",
  27. "kid": "_Lfex-skFCKBZd0xMN5dZSAX7uoG6LMx3i2qHReqU0c",
  28. "qi": "GNhYuNdxd4NyRhzreW72PWXzj2oIkm0rIHrcNW9bpqK1fxrsbiVUEVUly-cqpD_-AjFOyCWcKWQxHG7J8LeP2vW3_U4TLx_jKD9cc7S65gb37El1ihOwNWbapRxToOhP2sZa0g3y9P-M_8hQcfKr1OFMQMnD9wj-sVNw9yJf3I4", # NOQA
  29. "dp": "xTs6BrEISEK-w1N9Dvy1JXWToroMKQGojiugzVQAVjGLkWvfS5RpzmZUAo52taZ911EZOHTXlqGpx1jFVGy5176JW2RlH5THqEX-b8tchcBL3yCv_hd4vHwUglYSfMRmgwvPZ4wXC0C_WqaYwA8Gm7UdbepWLIBRHbpjuOL8AaE", # NOQA
  30. "alg": "RS256",
  31. "dq": "C4_UTcwKBRLKSCm10PAce5O2XBzMcQsLkrbkspbwbl4jw0_Yg9WP6H-aogx2N1jSMmppWgETpT1vGCHJietrMIrNcip-914Xn-I6wMws4UYSTzxEFHjDq-TfpOrOxxmkkbEwZ6Ne5xOPUxMAuTXUEb3l_keb6g4pjFQGwM405d8", # NOQA
  32. "n": "g6k31kvFdTaCSxXhazC5JaVekYi836F0H_YLrDioQlwiegsGjUDYk5TM7z8iXwDIm0QZZgtoEBlEny8vXrt1WGMO8GGwnVNq0_ZAD3JYp-a_c0X7VM7I2Dze32zcy8mC4QhPedEbMVDzi1XrusGjNHWObkMKsLZ7RRlwdkgR4nRpzncou_2ZJLvc50C8tjd3juCpUMWXNsvDjoAenxoXs68SDK4h9QSjvaWaSHNRGYiYkGUvcL5rv3htbrHIUVAcBC9r0j5Ued1hBR9ND1KPxVJWnn8oRAxFrYIcQdaDFWnWdb5BY9pJQls9fHlt0PF9vXUm-GufWk0U8D4Lc8V78w", # NOQA
  33. }
  34. @pytest.fixture
  35. def public_key(private_key):
  36. keys = ("alg", "e", "kid", "kty", "n", "use")
  37. return {k: private_key[k] for k in keys}
  38. @pytest.fixture
  39. def patched_verifier(public_key, settings):
  40. verifier = OAuth2AccessTokenVerifier(scope="all", **settings)
  41. with mock.patch.object(verifier, "jwk_client") as jwk_client:
  42. jwk_client.get_signing_key_from_jwt.return_value = jwt.PyJWK.from_dict(
  43. public_key
  44. )
  45. yield verifier
  46. @pytest.fixture
  47. def token_generator(private_key, settings):
  48. default_claims = {
  49. "sub": "foo",
  50. "iss": settings["issuer"],
  51. "scope": f"{settings['resource_server_id']}all",
  52. "token_use": "access",
  53. "exp": int(time.time()) + 3600,
  54. "iat": int(time.time()) - 3600,
  55. "nbf": int(time.time()) - 3600,
  56. }
  57. def generate_token(**claim_overrides):
  58. claims = {**default_claims, **claim_overrides}
  59. claims = {k: v for (k, v) in claims.items() if v is not None}
  60. return jwt.encode(
  61. claims,
  62. key=jwt.PyJWK.from_dict(private_key).key,
  63. algorithm=private_key["alg"],
  64. headers={"kid": private_key["kid"]},
  65. )
  66. return generate_token
  67. def test_verifier_ok(patched_verifier, token_generator):
  68. token = token_generator()
  69. verified_claims = patched_verifier(token)
  70. assert verified_claims == jwt.decode(token, options={"verify_signature": False})
  71. patched_verifier.jwk_client.get_signing_key_from_jwt.assert_called_once_with(token)
  72. def test_verifier_exp_leeway(patched_verifier, token_generator):
  73. token = token_generator(exp=int(time.time()) - 60)
  74. patched_verifier(token)
  75. def test_verifier_multiple_scopes(patched_verifier, token_generator, settings):
  76. token = token_generator(scope=f"scope1 {settings['resource_server_id']}all scope3")
  77. patched_verifier(token)
  78. @pytest.mark.parametrize(
  79. "claim_overrides",
  80. [
  81. {"iss": "https://authserver"},
  82. {"iss": None},
  83. {"scope": "nothing"},
  84. {"scope": None},
  85. {"exp": int(time.time()) - 3600},
  86. {"exp": None},
  87. {"nbf": int(time.time()) + 3600},
  88. {"token_use": "id"},
  89. {"token_use": None},
  90. {"sub": None},
  91. ],
  92. )
  93. def test_verifier_bad(patched_verifier, token_generator, claim_overrides):
  94. token = token_generator(**claim_overrides)
  95. with pytest.raises(Unauthorized):
  96. patched_verifier(token)
  97. def test_verifier_authorize(patched_verifier, token_generator):
  98. token = token_generator(sub="bar")
  99. with pytest.raises(PermissionDenied):
  100. patched_verifier(token)