fluentbit_gateway.py 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. # (c) Nelen & Schuurmans
  2. import time
  3. from typing import Tuple
  4. from asgiref.sync import sync_to_async
  5. from fluent.sender import FluentSender
  6. from clean_python import Gateway
  7. from clean_python import Json
  8. from clean_python import SyncGateway
  9. __all__ = ["FluentbitGateway", "SyncFluentbitGateway"]
  10. def unpack_item(item: Json) -> Tuple[str, float, Json]:
  11. data = item.copy()
  12. label = data.pop("tag_suffix", "")
  13. timestamp = data.pop("time", None)
  14. if timestamp is None:
  15. timestamp = time.time()
  16. return label, timestamp, data
  17. class SyncFluentbitGateway(SyncGateway):
  18. def __init__(self, tag: str, host: str, port: int):
  19. self._sender = FluentSender(
  20. tag, host=host, port=port, nanosecond_precision=True
  21. )
  22. def add(self, item: Json):
  23. label, timestamp, data = unpack_item(item)
  24. self._sender.emit_with_time(label, timestamp, data)
  25. return {**data, "time": timestamp, "tag_suffix": label}
  26. class FluentbitGateway(Gateway):
  27. def __init__(self, tag: str, host: str, port: int):
  28. self._sync_gateway = SyncFluentbitGateway(tag, host, port)
  29. @sync_to_async
  30. def _add(self, item: Json) -> Json:
  31. return self._sync_gateway.add(item)
  32. async def add(self, item: Json) -> Json:
  33. return await self._add(item)