1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980 |
- # (c) Nelen & Schuurmans
- import json
- import uuid
- from typing import Optional
- import pika
- from asgiref.sync import sync_to_async
- from pydantic import AnyUrl
- from clean_python import Gateway
- from clean_python import Json
- from clean_python import ValueObject
- __all__ = ["CeleryRmqBroker"]
- class CeleryHeaders(ValueObject):
- lang: str = "py"
- task: str
- id: uuid.UUID
- root_id: uuid.UUID
- parent_id: Optional[uuid.UUID] = None
- group: Optional[uuid.UUID] = None
- argsrepr: Optional[str] = None
- kwargsrepr: Optional[str] = None
- origin: Optional[str] = None
- def json_dict(self):
- return json.loads(self.model_dump_json())
- class CeleryRmqBroker(Gateway):
- def __init__(
- self, broker_url: AnyUrl, queue: str, origin: str, declare_queue: bool = False
- ):
- self._parameters = pika.URLParameters(str(broker_url))
- self._queue = queue
- self._origin = origin
- self._declare_queue = declare_queue
- @sync_to_async
- def add(self, item: Json) -> Json:
- task = item["task"]
- args = list(item.get("args") or [])
- kwargs = dict(item.get("kwargs") or {})
- task_id = uuid.uuid4()
- header = CeleryHeaders(
- task=task,
- id=task_id,
- root_id=task_id,
- argsrepr=json.dumps(args),
- kwargsrepr=json.dumps(kwargs),
- origin=self._origin,
- )
- body = json.dumps((args, kwargs, None))
- with pika.BlockingConnection(self._parameters) as connection:
- channel = connection.channel()
- if self._declare_queue:
- channel.queue_declare(queue=self._queue)
- else:
- pass # Configured by Lizard
- properties = pika.BasicProperties(
- correlation_id=str(task_id),
- content_type="application/json",
- content_encoding="utf-8",
- headers=header.json_dict(),
- )
- channel.basic_publish(
- exchange="",
- routing_key=self._queue,
- body=body,
- properties=properties,
- )
- return item
|