sleep_task.py 687 B

1234567891011121314151617181920212223242526272829
  1. # (c) Nelen & Schuurmans
  2. import asyncio
  3. from dramatiq.middleware import SkipMessage
  4. from .async_actor import async_actor
  5. @async_actor(
  6. retry_when=lambda x, y: isinstance(y, KeyError),
  7. max_retries=1,
  8. )
  9. async def sleep_task(seconds: int, return_value=None, event="success"):
  10. event = event.lower()
  11. if event == "success":
  12. await asyncio.sleep(int(seconds))
  13. elif event == "crash":
  14. import ctypes
  15. ctypes.string_at(0) # segfault
  16. elif event == "skip":
  17. raise SkipMessage("skipping")
  18. elif event == "retry":
  19. raise KeyError("will-retry")
  20. else:
  21. raise ValueError(f"Unknown event '{event}'")
  22. return return_value