1234567891011121314151617181920212223242526272829303132333435 |
- # (c) Nelen & Schuurmans
- from pathlib import Path
- import dramatiq
- import yappi
- __all__ = ["ProfilerMiddleware"]
- PROFILE_DIR = "var"
- class ProfilerMiddleware(dramatiq.Middleware):
- """For usage with dramatiq (single-threaded only)"""
- def __init__(self, profile_dir: Path):
- profile_dir.mkdir(exist_ok=True)
- self.profile_dir = profile_dir
- def before_process_message(self, broker, message):
- yappi.set_clock_type("wall")
- yappi.start()
- def after_process_message(
- self, broker, message: dramatiq.Message, *, result=None, exception=None
- ):
- yappi.stop()
- stats = yappi.convert2pstats(yappi.get_func_stats())
- stats.dump_stats(
- self.profile_dir / f"{message.actor_name}-{message.message_id}.pstats"
- )
- yappi.clear_stats()
|