fluentbit_gateway.py 554 B

12345678910111213141516171819202122
  1. # -*- coding: utf-8 -*-
  2. # (c) Nelen & Schuurmans
  3. from typing import Any
  4. from typing import Dict
  5. from asgiref.sync import sync_to_async
  6. from fluent.sender import FluentSender
  7. from clean_python.base.infrastructure.gateway import Gateway
  8. Json = Dict[str, Any]
  9. class FluentbitGateway(Gateway):
  10. def __init__(self, tag: str, host: str, port: int):
  11. self._sender = FluentSender(tag, host=host, port=port)
  12. @sync_to_async
  13. def add(self, item: Json) -> Json:
  14. self._sender.emit(item.pop("tag_suffix", ""), item)
  15. return item