async_actor.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. # -*- coding: utf-8 -*-
  2. # (c) Nelen & Schuurmans
  3. """Dramatiq configuration"""
  4. import asyncio
  5. import logging
  6. import threading
  7. import time
  8. from concurrent.futures import TimeoutError
  9. from typing import Any
  10. from typing import Awaitable
  11. from typing import Dict
  12. from typing import Optional
  13. from typing import TypeVar
  14. import dramatiq
  15. from asgiref.sync import sync_to_async
  16. from dramatiq.brokers.stub import StubBroker
  17. from dramatiq.middleware import Interrupt
  18. from dramatiq.middleware import Middleware
  19. __all__ = ["AsyncActor", "AsyncMiddleware", "async_actor"]
  20. logger = logging.getLogger(__name__)
  21. # Default broker (for testing)
  22. broker = StubBroker()
  23. broker.run_coroutine = lambda coro: asyncio.run(coro)
  24. dramatiq.set_broker(broker)
  25. R = TypeVar("R")
  26. class EventLoopThread(threading.Thread):
  27. """A thread that starts / stops an asyncio event loop.
  28. The method 'run_coroutine' should be used to run coroutines from a
  29. synchronous context.
  30. """
  31. EVENT_LOOP_START_TIMEOUT = 0.1 # seconds to wait for the event loop to start
  32. loop: Optional[asyncio.AbstractEventLoop] = None
  33. def __init__(self):
  34. super().__init__(target=self._start_event_loop)
  35. def _start_event_loop(self):
  36. """This method should run in the thread"""
  37. logger.info("Starting the event loop...")
  38. self.loop = asyncio.new_event_loop()
  39. try:
  40. self.loop.run_forever()
  41. finally:
  42. self.loop.close()
  43. def _stop_event_loop(self):
  44. """This method should run outside of the thread"""
  45. if self.loop is not None:
  46. logger.info("Stopping the event loop...")
  47. self.loop.call_soon_threadsafe(self.loop.stop)
  48. def run_coroutine(self, coro: Awaitable[R]) -> R:
  49. """To be called from outside the thread
  50. Blocks until the coroutine is finished.
  51. """
  52. if self.loop is None or not self.loop.is_running():
  53. raise RuntimeError("The event loop is not running")
  54. done = threading.Event()
  55. async def wrapped_coro() -> R:
  56. try:
  57. return await coro
  58. finally:
  59. done.set()
  60. future = asyncio.run_coroutine_threadsafe(wrapped_coro(), self.loop)
  61. try:
  62. while True:
  63. try:
  64. # Use a timeout to be able to catch asynchronously raised dramatiq
  65. # exceptions (Shutdown and TimeLimitExceeded).
  66. return future.result(timeout=1)
  67. except TimeoutError:
  68. continue
  69. except Interrupt:
  70. self.loop.call_soon_threadsafe(future.cancel)
  71. # The future will raise a CancelledError *before* the coro actually
  72. # finished cleanup. Wait for the event instead.
  73. done.wait()
  74. raise
  75. def start(self, *args, **kwargs):
  76. super().start(*args, **kwargs)
  77. time.sleep(self.EVENT_LOOP_START_TIMEOUT)
  78. if self.loop is None or not self.loop.is_running():
  79. logger.exception("The event loop failed to start")
  80. logger.info("Event loop is running.")
  81. def join(self, *args, **kwargs):
  82. self._stop_event_loop()
  83. return super().join(*args, **kwargs)
  84. class AsyncMiddleware(Middleware):
  85. """This middleware enables coroutines to be ran as dramatiq a actors.
  86. At its core, this middleware spins up a dedicated thread ('event_loop_thread'),
  87. which may be used to schedule the coroutines on from the worker threads.
  88. """
  89. event_loop_thread: Optional[EventLoopThread] = None
  90. def run_coroutine(self, coro: Awaitable[R]) -> R:
  91. assert self.event_loop_thread is not None
  92. return self.event_loop_thread.run_coroutine(coro)
  93. def before_worker_boot(self, broker, worker):
  94. self.event_loop_thread = EventLoopThread()
  95. self.event_loop_thread.start()
  96. broker.run_coroutine = self.run_coroutine
  97. def after_worker_shutdown(self, broker, worker):
  98. assert self.event_loop_thread is not None
  99. self.event_loop_thread.join()
  100. self.event_loop_thread = None
  101. delattr(broker, "run_coroutine")
  102. class AsyncActor(dramatiq.Actor):
  103. """To configure coroutines as a dramatiq actor.
  104. Requires AsyncMiddleware to be active.
  105. Example usage:
  106. >>> @dramatiq.actor(..., actor_class=AsyncActor)
  107. ... async def my_task(x):
  108. ... print(x)
  109. Notes:
  110. The async functions are scheduled on an event loop that is shared between
  111. worker threads. See AsyncMiddleware.
  112. This is compatible with ShutdownNotifications ("notify_shutdown") and
  113. TimeLimit ("time_limit"). Both result in an asyncio.CancelledError raised inside
  114. the async function. There is currently no way to tell the two apart.
  115. """
  116. def __init__(self, fn, *args, **kwargs):
  117. super().__init__(
  118. lambda *args, **kwargs: self.broker.run_coroutine(fn(*args, **kwargs)),
  119. *args,
  120. **kwargs,
  121. )
  122. @sync_to_async
  123. def send_async(self, *args, **kwargs) -> dramatiq.Message[R]:
  124. """See dramatiq.actor.Actor.send.
  125. Sending a message to a broker is potentially blocking, so @sync_to_async is used.
  126. """
  127. return super().send(*args, **kwargs)
  128. @sync_to_async
  129. def send_async_with_options(
  130. self,
  131. *,
  132. args: tuple = (),
  133. kwargs: Optional[Dict[str, Any]] = None,
  134. delay: Optional[int] = None,
  135. **options,
  136. ) -> dramatiq.Message[R]:
  137. """See dramatiq.actor.Actor.send_with_options.
  138. Sending a message to a broker is potentially blocking, so @sync_to_async is used.
  139. """
  140. return super().send_with_options(
  141. args=args, kwargs=kwargs, delay=delay, **options
  142. )
  143. def async_actor(awaitable=None, **kwargs):
  144. kwargs.setdefault("max_retries", 0)
  145. if awaitable:
  146. return dramatiq.actor(awaitable, actor_class=AsyncActor, **kwargs)
  147. else:
  148. def wrapper(awaitable):
  149. return dramatiq.actor(awaitable, actor_class=AsyncActor, **kwargs)
  150. return wrapper