dramatiq_profiler.py 832 B

1234567891011121314151617181920212223242526272829303132333435
  1. # (c) Nelen & Schuurmans
  2. from pathlib import Path
  3. import dramatiq
  4. import yappi
  5. __all__ = ["ProfilerMiddleware"]
  6. PROFILE_DIR = "var"
  7. class ProfilerMiddleware(dramatiq.Middleware):
  8. """For usage with dramatiq (single-threaded only)"""
  9. def __init__(self, profile_dir: Path):
  10. profile_dir.mkdir(exist_ok=True)
  11. self.profile_dir = profile_dir
  12. def before_process_message(self, broker, message):
  13. yappi.set_clock_type("wall")
  14. yappi.start()
  15. def after_process_message(
  16. self, broker, message: dramatiq.Message, *, result=None, exception=None
  17. ):
  18. yappi.stop()
  19. stats = yappi.convert2pstats(yappi.get_func_stats())
  20. stats.dump_stats(
  21. self.profile_dir / f"{message.actor_name}-{message.message_id}.pstats"
  22. )
  23. yappi.clear_stats()