12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 |
- # (c) Nelen & Schuurmans
- import os
- from contextvars import ContextVar
- from typing import FrozenSet
- from typing import Optional
- from pydantic import AnyUrl
- from pydantic import FileUrl
- from .value_object import ValueObject
- __all__ = ["ctx", "User", "Tenant", "Scope"]
- class User(ValueObject):
- id: str
- name: str
- Scope = FrozenSet[str]
- class Tenant(ValueObject):
- id: int
- name: str
- class Context:
- """Provide global access to some contextual properties.
- The implementation makes use of python's contextvars, which automatically integrates
- with asyncio tasks (so that each task runs in its own context). This makes sure that
- every request-response cycle is isolated.
- """
- def __init__(self):
- self._path_value: ContextVar[AnyUrl] = ContextVar(
- "path_value",
- default=FileUrl.build(scheme="file", host="/", path=os.getcwd()),
- )
- self._user_value: ContextVar[User] = ContextVar(
- "user_value", default=User(id="ANONYMOUS", name="anonymous")
- )
- self._tenant_value: ContextVar[Optional[Tenant]] = ContextVar(
- "tenant_value", default=None
- )
- def reset(self):
- self._path_value.reset()
- self._user_value.reset()
- self._tenant_value.reset()
- @property
- def path(self) -> AnyUrl:
- return self._path_value.get()
- @path.setter
- def path(self, value: AnyUrl) -> None:
- self._path_value.set(value)
- @property
- def user(self) -> User:
- return self._user_value.get()
- @user.setter
- def user(self, value: User) -> None:
- self._user_value.set(value)
- @property
- def tenant(self) -> Optional[Tenant]:
- return self._tenant_value.get()
- @tenant.setter
- def tenant(self, value: Optional[Tenant]) -> None:
- self._tenant_value.set(value)
- ctx = Context()
|