| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748 | from pathlib import Pathfrom celery import bootstepsfrom celery.signals import worker_readyfrom celery.signals import worker_shutdown__all__ = ["setup_kubernetes_probes"]HEARTBEAT_FILE = Path("/dev/shm/worker_heartbeat")READINESS_FILE = Path("/dev/shm/worker_ready")def register_readiness(**_):    READINESS_FILE.touch()def unregister_readiness(**_):    READINESS_FILE.unlink(missing_ok=True)class LivenessProbe(bootsteps.StartStopStep):    requires = {"celery.worker.components:Timer"}    def __init__(self, worker, **kwargs):        self.requests = []        self.tref = None    def start(self, worker):        self.tref = worker.timer.call_repeatedly(            1.0,            self.update_heartbeat_file,            (worker,),            priority=10,        )    def stop(self, worker):        HEARTBEAT_FILE.unlink(missing_ok=True)    def update_heartbeat_file(self, worker):        HEARTBEAT_FILE.touch()def setup_kubernetes_probes(app):    worker_ready.connect(register_readiness)    worker_shutdown.connect(unregister_readiness)    app.steps["worker"].add(LivenessProbe)
 |