123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- import os
- import time
- from typing import Awaitable
- from typing import Callable
- from typing import Optional
- import inject
- from starlette.background import BackgroundTasks
- from starlette.requests import Request
- from starlette.responses import Response
- from clean_python import ctx
- from clean_python import Gateway
- from clean_python.fluentbit import FluentbitGateway
- __all__ = ["FastAPIAccessLogger"]
- class FastAPIAccessLogger:
- def __init__(self, hostname: str, gateway_override: Optional[Gateway] = None):
- self.origin = f"{hostname}-{os.getpid()}"
- self.gateway_override = gateway_override
- @property
- def gateway(self) -> Gateway:
- return self.gateway_override or inject.instance(FluentbitGateway)
- async def __call__(
- self, request: Request, call_next: Callable[[Request], Awaitable[Response]]
- ) -> Response:
- time_received = time.time()
- response = await call_next(request)
- request_time = time.time() - time_received
-
-
- if response.background is None:
- response.background = BackgroundTasks()
- response.background.add_task(
- log_access, self.gateway, request, response, time_received, request_time
- )
- return response
- async def log_access(
- gateway: Gateway,
- request: Request,
- response: Response,
- time_received: float,
- request_time: float,
- ) -> None:
- """
- Create a dictionary with logging data.
- """
- try:
- content_length = int(response.headers.get("content-length"))
- except (TypeError, ValueError):
- content_length = None
- try:
- view_name = request.scope["route"].name
- except KeyError:
- view_name = None
- item = {
- "tag_suffix": "access_log",
- "remote_address": getattr(request.client, "host", None),
- "method": request.method,
- "path": request.url.path,
- "portal": request.url.netloc,
- "referer": request.headers.get("referer"),
- "user_agent": request.headers.get("user-agent"),
- "query_params": request.url.query,
- "view_name": view_name,
- "status": response.status_code,
- "content_type": response.headers.get("content-type"),
- "content_length": content_length,
- "time": time_received,
- "request_time": request_time,
- "correlation_id": str(ctx.correlation_id) if ctx.correlation_id else None,
- }
- await gateway.add(item)
|