| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546 | # (c) Nelen & Schuurmansimport timefrom typing import Tuplefrom asgiref.sync import sync_to_asyncfrom fluent.sender import FluentSenderfrom clean_python import Gatewayfrom clean_python import Jsonfrom clean_python import SyncGateway__all__ = ["FluentbitGateway", "SyncFluentbitGateway"]def unpack_item(item: Json) -> Tuple[str, float, Json]:    data = item.copy()    label = data.pop("tag_suffix", "")    timestamp = data.pop("time", None)    if timestamp is None:        timestamp = time.time()    return label, timestamp, dataclass SyncFluentbitGateway(SyncGateway):    def __init__(self, tag: str, host: str, port: int):        self._sender = FluentSender(            tag, host=host, port=port, nanosecond_precision=True        )    def add(self, item: Json):        label, timestamp, data = unpack_item(item)        self._sender.emit_with_time(label, timestamp, data)        return {**data, "time": timestamp, "tag_suffix": label}class FluentbitGateway(Gateway):    def __init__(self, tag: str, host: str, port: int):        self._sync_gateway = SyncFluentbitGateway(tag, host, port)    @sync_to_async    def _add(self, item: Json) -> Json:        return self._sync_gateway.add(item)    async def add(self, item: Json) -> Json:        return await self._add(item)
 |