| 1234567891011121314151617181920212223242526272829303132333435 | # (c) Nelen & Schuurmansfrom pathlib import Pathimport dramatiqimport yappi__all__ = ["ProfilerMiddleware"]PROFILE_DIR = "var"class ProfilerMiddleware(dramatiq.Middleware):    """For usage with dramatiq (single-threaded only)"""    def __init__(self, profile_dir: Path):        profile_dir.mkdir(exist_ok=True)        self.profile_dir = profile_dir    def before_process_message(self, broker, message):        yappi.set_clock_type("wall")        yappi.start()    def after_process_message(        self, broker, message: dramatiq.Message, *, result=None, exception=None    ):        yappi.stop()        stats = yappi.convert2pstats(yappi.get_func_stats())        stats.dump_stats(            self.profile_dir / f"{message.actor_name}-{message.message_id}.pstats"        )        yappi.clear_stats()
 |