123456789101112131415161718192021222324252627282930313233343536373839404142434445464748 |
- from pathlib import Path
- from celery import bootsteps
- from celery.signals import worker_ready
- from celery.signals import worker_shutdown
- __all__ = ["setup_kubernetes_probes"]
- HEARTBEAT_FILE = Path("/dev/shm/worker_heartbeat")
- READINESS_FILE = Path("/dev/shm/worker_ready")
- def register_readiness(**_):
- READINESS_FILE.touch()
- def unregister_readiness(**_):
- READINESS_FILE.unlink(missing_ok=True)
- class LivenessProbe(bootsteps.StartStopStep):
- requires = {"celery.worker.components:Timer"}
- def __init__(self, worker, **kwargs):
- self.requests = []
- self.tref = None
- def start(self, worker):
- self.tref = worker.timer.call_repeatedly(
- 1.0,
- self.update_heartbeat_file,
- (worker,),
- priority=10,
- )
- def stop(self, worker):
- HEARTBEAT_FILE.unlink(missing_ok=True)
- def update_heartbeat_file(self, worker):
- HEARTBEAT_FILE.touch()
- def setup_kubernetes_probes(app):
- worker_ready.connect(register_readiness)
- worker_shutdown.connect(unregister_readiness)
- app.steps["worker"].add(LivenessProbe)
|