| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 | from http import HTTPStatusfrom typing import Optionalfrom fastapi import Dependsfrom fastapi import Formfrom fastapi import Responsefrom fastapi import UploadFilefrom clean_python import DoesNotExistfrom clean_python import Pagefrom clean_python import ValueObjectfrom clean_python.fastapi import deletefrom clean_python.fastapi import getfrom clean_python.fastapi import patchfrom clean_python.fastapi import postfrom clean_python.fastapi import putfrom clean_python.fastapi import RequestQueryfrom clean_python.fastapi import Resourcefrom clean_python.fastapi import vfrom .application import ManageBookfrom .domain import Authorfrom .domain import Bookclass BookCreate(ValueObject):    author: Author    title: strclass BookUpdate(ValueObject):    author: Optional[Author] = None    title: Optional[str] = Noneclass V1Books(Resource, version=v(1), name="books"):    def __init__(self):        self.manager = ManageBook()    @get("/books", response_model=Page[Book])    async def list(self, q: RequestQuery = Depends()):        return await self.manager.filter([], q.as_page_options())    @post("/books", status_code=HTTPStatus.CREATED, response_model=Book)    async def create(self, obj: BookCreate):        return await self.manager.create(obj.model_dump())    @get("/books/{id}", response_model=Book)    async def retrieve(self, id: int):        return await self.manager.retrieve(id)    @patch("/books/{id}", response_model=Book)    async def update(self, id: int, obj: BookUpdate):        return await self.manager.update(id, obj.model_dump(exclude_unset=True))    @delete("/books/{id}", status_code=HTTPStatus.NO_CONTENT, response_class=Response)    async def destroy(self, id: int):        if not await self.manager.destroy(id):            raise DoesNotExist("object", id)    @get("/text")    async def text(self):        return Response("foo", media_type="text/plain")    @post("/form", response_model=Author)    async def form(self, name: str = Form()):        return {"name": name}    @post("/file")    async def file(self, file: UploadFile):        return {file.filename: (await file.read()).decode()}    @put("/urlencode/{name}", response_model=Author)    async def urlencode(self, name: str):        return {"name": name}
 |