| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 |
- from http import HTTPStatus
- from typing import Optional
- from fastapi import Depends
- from fastapi import Form
- from fastapi import Response
- from fastapi import UploadFile
- from clean_python import DoesNotExist
- from clean_python import Page
- from clean_python import ValueObject
- from clean_python.fastapi import delete
- from clean_python.fastapi import get
- from clean_python.fastapi import patch
- from clean_python.fastapi import post
- from clean_python.fastapi import put
- from clean_python.fastapi import RequestQuery
- from clean_python.fastapi import Resource
- from clean_python.fastapi import v
- from .application import ManageBook
- from .domain import Author
- from .domain import Book
- class BookCreate(ValueObject):
- author: Author
- title: str
- class BookUpdate(ValueObject):
- author: Optional[Author] = None
- title: Optional[str] = None
- class V1Books(Resource, version=v(1), name="books"):
- def __init__(self):
- self.manager = ManageBook()
- @get("/books", response_model=Page[Book])
- async def list(self, q: RequestQuery = Depends()):
- return await self.manager.filter([], q.as_page_options())
- @post("/books", status_code=HTTPStatus.CREATED, response_model=Book)
- async def create(self, obj: BookCreate):
- return await self.manager.create(obj.model_dump())
- @get("/books/{id}", response_model=Book)
- async def retrieve(self, id: int):
- return await self.manager.retrieve(id)
- @patch("/books/{id}", response_model=Book)
- async def update(self, id: int, obj: BookUpdate):
- return await self.manager.update(id, obj.model_dump(exclude_unset=True))
- @delete("/books/{id}", status_code=HTTPStatus.NO_CONTENT, response_class=Response)
- async def destroy(self, id: int):
- if not await self.manager.destroy(id):
- raise DoesNotExist("object", id)
- @get("/text")
- async def text(self):
- return Response("foo", media_type="text/plain")
- @post("/form", response_model=Author)
- async def form(self, name: str = Form()):
- return {"name": name}
- @post("/file")
- async def file(self, file: UploadFile):
- return {file.filename: (await file.read()).decode()}
- @put("/urlencode/{name}", response_model=Author)
- async def urlencode(self, name: str):
- return {"name": name}
|