dramatiq_task_logger.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. import os
  2. import threading
  3. import time
  4. from typing import Optional
  5. import inject
  6. from dramatiq import get_encoder, Middleware
  7. from dramatiq.errors import RateLimitExceeded, Retry
  8. from dramatiq.message import Message
  9. from dramatiq.middleware import SkipMessage
  10. from clean_python.fluentbit.fluentbit_gateway import FluentbitGateway
  11. from clean_python.base.infrastructure.gateway import Gateway
  12. __all__ = ["AsyncLoggingMiddleware"]
  13. class AsyncLoggingMiddleware(Middleware):
  14. def __init__(self, **kwargs):
  15. self.logger = DramatiqTaskLogger(**kwargs)
  16. def before_process_message(self, broker, message):
  17. broker.run_coroutine(self.logger.start())
  18. def after_skip_message(self, broker, message):
  19. broker.run_coroutine(self.logger.stop(message, None, SkipMessage()))
  20. def after_process_message(self, broker, message, *, result=None, exception=None):
  21. broker.run_coroutine(self.logger.stop(message, result, exception))
  22. class DramatiqTaskLogger:
  23. local = threading.local()
  24. def __init__(
  25. self,
  26. hostname: str,
  27. gateway_override: Optional[Gateway] = None,
  28. ):
  29. self.origin = f"{hostname}-{os.getpid()}"
  30. self.gateway_override = gateway_override
  31. @property
  32. def gateway(self):
  33. return self.gateway_override or inject.instance(FluentbitGateway)
  34. @property
  35. def encoder(self):
  36. return get_encoder()
  37. async def start(self):
  38. self.local.start_time = time.time()
  39. async def stop(self, message: Message, result=None, exception=None):
  40. if exception is None:
  41. state = "SUCCESS"
  42. elif isinstance(exception, Retry):
  43. state = "RETRY"
  44. elif isinstance(exception, SkipMessage):
  45. state = "EXPIRED"
  46. elif isinstance(exception, RateLimitExceeded):
  47. state = "TERMINATED"
  48. else:
  49. state = "FAILURE"
  50. try:
  51. duration = time.time() - self.local.start_time
  52. except AttributeError:
  53. duration = 0
  54. log_dict = {
  55. "tag_suffix": "task_log",
  56. "task_id": message.message_id,
  57. "name": message.actor_name,
  58. "state": state,
  59. "duration": duration,
  60. "retries": message.options.get("retries", 0),
  61. "origin": self.origin,
  62. "argsrepr": self.encoder.encode(message.args),
  63. "kwargsrepr": self.encoder.encode(message.kwargs),
  64. "result": result,
  65. }
  66. return await self.gateway.add(log_dict)