|
@@ -4,13 +4,15 @@ import os
|
|
import threading
|
|
import threading
|
|
import time
|
|
import time
|
|
from typing import Optional
|
|
from typing import Optional
|
|
|
|
+from uuid import UUID
|
|
|
|
+from uuid import uuid4
|
|
|
|
|
|
import inject
|
|
import inject
|
|
from dramatiq import get_encoder
|
|
from dramatiq import get_encoder
|
|
|
|
+from dramatiq import Message
|
|
from dramatiq import Middleware
|
|
from dramatiq import Middleware
|
|
from dramatiq.errors import RateLimitExceeded
|
|
from dramatiq.errors import RateLimitExceeded
|
|
from dramatiq.errors import Retry
|
|
from dramatiq.errors import Retry
|
|
-from dramatiq.message import Message
|
|
|
|
from dramatiq.middleware import SkipMessage
|
|
from dramatiq.middleware import SkipMessage
|
|
|
|
|
|
from clean_python import ctx
|
|
from clean_python import ctx
|
|
@@ -24,14 +26,24 @@ class AsyncLoggingMiddleware(Middleware):
|
|
def __init__(self, **kwargs):
|
|
def __init__(self, **kwargs):
|
|
self.logger = DramatiqTaskLogger(**kwargs)
|
|
self.logger = DramatiqTaskLogger(**kwargs)
|
|
|
|
|
|
|
|
+ def before_enqueue(self, broker, message: Message, delay):
|
|
|
|
+ if ctx.correlation_id is not None:
|
|
|
|
+ message.options["correlation_id"] = str(ctx.correlation_id)
|
|
|
|
+
|
|
def before_process_message(self, broker, message):
|
|
def before_process_message(self, broker, message):
|
|
|
|
+ if message.options.get("correlation_id") is not None:
|
|
|
|
+ ctx.correlation_id = UUID(message.options["correlation_id"])
|
|
|
|
+ else:
|
|
|
|
+ ctx.correlation_id = uuid4()
|
|
broker.run_coroutine(self.logger.start())
|
|
broker.run_coroutine(self.logger.start())
|
|
|
|
|
|
def after_skip_message(self, broker, message):
|
|
def after_skip_message(self, broker, message):
|
|
broker.run_coroutine(self.logger.stop(message, None, SkipMessage()))
|
|
broker.run_coroutine(self.logger.stop(message, None, SkipMessage()))
|
|
|
|
+ ctx.correlation_id = None
|
|
|
|
|
|
def after_process_message(self, broker, message, *, result=None, exception=None):
|
|
def after_process_message(self, broker, message, *, result=None, exception=None):
|
|
broker.run_coroutine(self.logger.stop(message, result, exception))
|
|
broker.run_coroutine(self.logger.stop(message, result, exception))
|
|
|
|
+ ctx.correlation_id = None
|
|
|
|
|
|
|
|
|
|
class DramatiqTaskLogger:
|
|
class DramatiqTaskLogger:
|