12345678910111213141516171819202122232425262728293031323334353637383940414243444546 |
- # (c) Nelen & Schuurmans
- import time
- from typing import Tuple
- from asgiref.sync import sync_to_async
- from fluent.sender import FluentSender
- from clean_python import Gateway
- from clean_python import Json
- from clean_python import SyncGateway
- __all__ = ["FluentbitGateway", "SyncFluentbitGateway"]
- def unpack_item(item: Json) -> Tuple[str, float, Json]:
- data = item.copy()
- label = data.pop("tag_suffix", "")
- timestamp = data.pop("time", None)
- if timestamp is None:
- timestamp = time.time()
- return label, timestamp, data
- class SyncFluentbitGateway(SyncGateway):
- def __init__(self, tag: str, host: str, port: int):
- self._sender = FluentSender(
- tag, host=host, port=port, nanosecond_precision=True
- )
- def add(self, item: Json):
- label, timestamp, data = unpack_item(item)
- self._sender.emit_with_time(label, timestamp, data)
- return {**data, "time": timestamp, "tag_suffix": label}
- class FluentbitGateway(Gateway):
- def __init__(self, tag: str, host: str, port: int):
- self._sync_gateway = SyncFluentbitGateway(tag, host, port)
- @sync_to_async
- def _add(self, item: Json) -> Json:
- return self._sync_gateway.add(item)
- async def add(self, item: Json) -> Json:
- return await self._add(item)
|