kubernetes.py 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. from pathlib import Path
  2. from celery import bootsteps
  3. from celery.signals import worker_ready
  4. from celery.signals import worker_shutdown
  5. __all__ = ["setup_kubernetes_probes"]
  6. HEARTBEAT_FILE = Path("/dev/shm/worker_heartbeat")
  7. READINESS_FILE = Path("/dev/shm/worker_ready")
  8. def register_readiness(**_):
  9. READINESS_FILE.touch()
  10. def unregister_readiness(**_):
  11. READINESS_FILE.unlink(missing_ok=True)
  12. class LivenessProbe(bootsteps.StartStopStep):
  13. requires = {"celery.worker.components:Timer"}
  14. def __init__(self, worker, **kwargs):
  15. self.requests = []
  16. self.tref = None
  17. def start(self, worker):
  18. self.tref = worker.timer.call_repeatedly(
  19. 1.0,
  20. self.update_heartbeat_file,
  21. (worker,),
  22. priority=10,
  23. )
  24. def stop(self, worker):
  25. HEARTBEAT_FILE.unlink(missing_ok=True)
  26. def update_heartbeat_file(self, worker):
  27. HEARTBEAT_FILE.touch()
  28. def setup_kubernetes_probes(app):
  29. worker_ready.connect(register_readiness)
  30. worker_shutdown.connect(unregister_readiness)
  31. app.steps["worker"].add(LivenessProbe)