fluentbit_gateway.py 536 B

123456789101112131415161718192021
  1. # -*- coding: utf-8 -*-
  2. # (c) Nelen & Schuurmans
  3. from typing import Any, Dict
  4. from asgiref.sync import sync_to_async
  5. from fluent.sender import FluentSender
  6. from clean_python.base.infrastructure.gateway import Gateway
  7. Json = Dict[str, Any]
  8. class FluentbitGateway(Gateway):
  9. def __init__(self, tag: str, host: str, port: int):
  10. self._sender = FluentSender(tag, host=host, port=port)
  11. @sync_to_async
  12. def add(self, item: Json) -> Json:
  13. self._sender.emit(item.pop("tag_suffix", ""), item)
  14. return item