async_actor.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. # (c) Nelen & Schuurmans
  2. import asyncio
  3. import logging
  4. import threading
  5. import time
  6. from concurrent.futures import TimeoutError
  7. from typing import Any
  8. from typing import Awaitable
  9. from typing import Dict
  10. from typing import Optional
  11. from typing import TypeVar
  12. import dramatiq
  13. from asgiref.sync import sync_to_async
  14. from dramatiq.brokers.stub import StubBroker
  15. from dramatiq.middleware import Interrupt
  16. from dramatiq.middleware import Middleware
  17. __all__ = ["AsyncActor", "AsyncMiddleware", "async_actor"]
  18. logger = logging.getLogger(__name__)
  19. # Default broker (for testing)
  20. broker = StubBroker()
  21. broker.run_coroutine = lambda coro: asyncio.run(coro)
  22. dramatiq.set_broker(broker)
  23. R = TypeVar("R")
  24. class EventLoopThread(threading.Thread):
  25. """A thread that starts / stops an asyncio event loop.
  26. The method 'run_coroutine' should be used to run coroutines from a
  27. synchronous context.
  28. """
  29. EVENT_LOOP_START_TIMEOUT = 0.1 # seconds to wait for the event loop to start
  30. loop: Optional[asyncio.AbstractEventLoop] = None
  31. def __init__(self):
  32. super().__init__(target=self._start_event_loop)
  33. def _start_event_loop(self):
  34. """This method should run in the thread"""
  35. logger.info("Starting the event loop...")
  36. self.loop = asyncio.new_event_loop()
  37. try:
  38. self.loop.run_forever()
  39. finally:
  40. self.loop.close()
  41. def _stop_event_loop(self):
  42. """This method should run outside of the thread"""
  43. if self.loop is not None:
  44. logger.info("Stopping the event loop...")
  45. self.loop.call_soon_threadsafe(self.loop.stop)
  46. def run_coroutine(self, coro: Awaitable[R]) -> R:
  47. """To be called from outside the thread
  48. Blocks until the coroutine is finished.
  49. """
  50. if self.loop is None or not self.loop.is_running():
  51. raise RuntimeError("The event loop is not running")
  52. done = threading.Event()
  53. async def wrapped_coro() -> R:
  54. try:
  55. return await coro
  56. finally:
  57. done.set()
  58. future = asyncio.run_coroutine_threadsafe(wrapped_coro(), self.loop)
  59. try:
  60. while True:
  61. try:
  62. # Use a timeout to be able to catch asynchronously raised dramatiq
  63. # exceptions (Shutdown and TimeLimitExceeded).
  64. return future.result(timeout=1)
  65. except TimeoutError:
  66. continue
  67. except Interrupt:
  68. self.loop.call_soon_threadsafe(future.cancel)
  69. # The future will raise a CancelledError *before* the coro actually
  70. # finished cleanup. Wait for the event instead.
  71. done.wait()
  72. raise
  73. def start(self, *args, **kwargs):
  74. super().start(*args, **kwargs)
  75. time.sleep(self.EVENT_LOOP_START_TIMEOUT)
  76. if self.loop is None or not self.loop.is_running():
  77. logger.exception("The event loop failed to start")
  78. logger.info("Event loop is running.")
  79. def join(self, *args, **kwargs):
  80. self._stop_event_loop()
  81. return super().join(*args, **kwargs)
  82. class AsyncMiddleware(Middleware):
  83. """This middleware enables coroutines to be ran as dramatiq a actors.
  84. At its core, this middleware spins up a dedicated thread ('event_loop_thread'),
  85. which may be used to schedule the coroutines on from the worker threads.
  86. """
  87. event_loop_thread: Optional[EventLoopThread] = None
  88. def run_coroutine(self, coro: Awaitable[R]) -> R:
  89. assert self.event_loop_thread is not None
  90. return self.event_loop_thread.run_coroutine(coro)
  91. def before_worker_boot(self, broker, worker):
  92. self.event_loop_thread = EventLoopThread()
  93. self.event_loop_thread.start()
  94. broker.run_coroutine = self.run_coroutine
  95. def after_worker_shutdown(self, broker, worker):
  96. assert self.event_loop_thread is not None
  97. self.event_loop_thread.join()
  98. self.event_loop_thread = None
  99. delattr(broker, "run_coroutine")
  100. class AsyncActor(dramatiq.Actor):
  101. """To configure coroutines as a dramatiq actor.
  102. Requires AsyncMiddleware to be active.
  103. Example usage:
  104. >>> @dramatiq.actor(..., actor_class=AsyncActor)
  105. ... async def my_task(x):
  106. ... print(x)
  107. Notes:
  108. The async functions are scheduled on an event loop that is shared between
  109. worker threads. See AsyncMiddleware.
  110. This is compatible with ShutdownNotifications ("notify_shutdown") and
  111. TimeLimit ("time_limit"). Both result in an asyncio.CancelledError raised inside
  112. the async function. There is currently no way to tell the two apart.
  113. """
  114. def __init__(self, fn, *args, **kwargs):
  115. super().__init__(
  116. lambda *args, **kwargs: self.broker.run_coroutine(fn(*args, **kwargs)),
  117. *args,
  118. **kwargs,
  119. )
  120. @sync_to_async
  121. def send_async(self, *args, **kwargs) -> dramatiq.Message[R]:
  122. """See dramatiq.actor.Actor.send.
  123. Sending a message to a broker is potentially blocking, so @sync_to_async is used.
  124. """
  125. return super().send(*args, **kwargs)
  126. @sync_to_async
  127. def send_async_with_options(
  128. self,
  129. *,
  130. args: tuple = (), # type: ignore
  131. kwargs: Optional[Dict[str, Any]] = None,
  132. delay: Optional[int] = None,
  133. **options,
  134. ) -> dramatiq.Message[R]:
  135. """See dramatiq.actor.Actor.send_with_options.
  136. Sending a message to a broker is potentially blocking, so @sync_to_async is used.
  137. """
  138. return super().send_with_options(
  139. args=args, kwargs=kwargs, delay=delay, **options
  140. )
  141. def async_actor(awaitable=None, **kwargs):
  142. kwargs.setdefault("max_retries", 0)
  143. if awaitable:
  144. return dramatiq.actor(awaitable, actor_class=AsyncActor, **kwargs)
  145. else:
  146. def wrapper(awaitable):
  147. return dramatiq.actor(awaitable, actor_class=AsyncActor, **kwargs)
  148. return wrapper