testing.py 713 B

12345678910111213141516171819202122232425262728293031
  1. # (c) Nelen & Schuurmans
  2. import asyncio
  3. from dramatiq.middleware import SkipMessage
  4. from .async_actor import async_actor
  5. __all__ = ["sleep_task"]
  6. @async_actor(
  7. retry_when=lambda x, y: isinstance(y, KeyError),
  8. max_retries=1,
  9. )
  10. async def sleep_task(seconds: int, return_value=None, event="success"):
  11. event = event.lower()
  12. if event == "success":
  13. await asyncio.sleep(int(seconds))
  14. elif event == "crash":
  15. import ctypes
  16. ctypes.string_at(0) # segfault
  17. elif event == "skip":
  18. raise SkipMessage("skipping")
  19. elif event == "retry":
  20. raise KeyError("will-retry")
  21. else:
  22. raise ValueError(f"Unknown event '{event}'")
  23. return return_value