fluentbit_gateway.py 493 B

12345678910111213141516171819
  1. # (c) Nelen & Schuurmans
  2. from asgiref.sync import sync_to_async
  3. from fluent.sender import FluentSender
  4. from clean_python import Gateway
  5. from clean_python import Json
  6. __all__ = ["FluentbitGateway"]
  7. class FluentbitGateway(Gateway):
  8. def __init__(self, tag: str, host: str, port: int):
  9. self._sender = FluentSender(tag, host=host, port=port)
  10. @sync_to_async
  11. def add(self, item: Json) -> Json:
  12. self._sender.emit(item.pop("tag_suffix", ""), item)
  13. return item