test_service_auth.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. from http import HTTPStatus
  2. import pytest
  3. from fastapi.testclient import TestClient
  4. from clean_python import ctx
  5. from clean_python import InMemoryGateway
  6. from clean_python.fastapi import get
  7. from clean_python.fastapi import Resource
  8. from clean_python.fastapi import Service
  9. from clean_python.fastapi import v
  10. from clean_python.oauth2 import OAuth2SPAClientSettings
  11. from clean_python.oauth2 import TokenVerifierSettings
  12. class FooResource(Resource, version=v(1), name="testing"):
  13. @get("/foo")
  14. def testing(self):
  15. return "ok"
  16. @get("/bar", scope="admin")
  17. def scoped(self):
  18. return "ok"
  19. @get("/context")
  20. def context(self):
  21. return {
  22. "path": str(ctx.path),
  23. "user": ctx.user,
  24. "tenant": ctx.tenant,
  25. }
  26. @pytest.fixture(params=["noclient", "client"])
  27. def app(request, settings: TokenVerifierSettings):
  28. if request.param == "noclient":
  29. auth_client = None
  30. elif request.param == "client":
  31. auth_client = OAuth2SPAClientSettings(
  32. client_id="123",
  33. token_url="https://server/token",
  34. authorization_url="https://server/token",
  35. )
  36. return Service(FooResource()).create_app(
  37. title="test",
  38. description="testing",
  39. hostname="testserver",
  40. auth=settings,
  41. auth_client=auth_client,
  42. access_logger_gateway=InMemoryGateway([]),
  43. )
  44. @pytest.fixture
  45. def client(app):
  46. return TestClient(app)
  47. @pytest.mark.usefixtures("jwk_patched")
  48. def test_no_header(app, client: TestClient):
  49. response = client.get(app.url_path_for("v1/testing"))
  50. assert response.status_code == HTTPStatus.UNAUTHORIZED
  51. @pytest.mark.usefixtures("jwk_patched")
  52. def test_ok(app, client: TestClient, token_generator):
  53. response = client.get(
  54. app.url_path_for("v1/testing"),
  55. headers={"Authorization": "Bearer " + token_generator()},
  56. )
  57. assert response.status_code == HTTPStatus.OK
  58. @pytest.mark.usefixtures("jwk_patched")
  59. def test_scoped_ok(app, client: TestClient, token_generator):
  60. response = client.get(
  61. app.url_path_for("v1/scoped"),
  62. headers={"Authorization": "Bearer " + token_generator(scope="user admin")},
  63. )
  64. assert response.status_code == HTTPStatus.OK
  65. @pytest.mark.usefixtures("jwk_patched")
  66. def test_scoped_forbidden(app, client: TestClient, token_generator):
  67. response = client.get(
  68. app.url_path_for("v1/scoped"),
  69. headers={"Authorization": "Bearer " + token_generator(scope="user")},
  70. )
  71. assert response.status_code == HTTPStatus.FORBIDDEN
  72. @pytest.mark.usefixtures("jwk_patched")
  73. def test_context(app, client: TestClient, token_generator):
  74. response = client.get(
  75. app.url_path_for("v1/context"),
  76. headers={
  77. "Authorization": "Bearer " + token_generator(tenant=2, tenant_name="bar")
  78. },
  79. )
  80. assert response.status_code == HTTPStatus.OK
  81. assert response.json() == {
  82. "path": "http://testserver/v1/context",
  83. "user": {"id": "foo", "name": "piet"},
  84. "tenant": {"id": 2, "name": "bar"},
  85. }
  86. assert ctx.user.id != "foo"
  87. assert ctx.tenant is None
  88. @pytest.mark.usefixtures("jwk_patched")
  89. def test_client_credentials_ok(app, client: TestClient, token_generator):
  90. response = client.get(
  91. app.url_path_for("v1/testing"),
  92. headers={
  93. "Authorization": "Bearer " + token_generator(username=None, client_id="foo")
  94. },
  95. )
  96. assert response.status_code == HTTPStatus.OK