Просмотр исходного кода

Add correlation_id to context to follow distributed tasks (#27)

Casper van der Wel 1 год назад
Родитель
Сommit
8815a2acb2

+ 7 - 1
CHANGES.md

@@ -4,7 +4,13 @@
 0.6.10 (unreleased)
 -------------------
 
-- Nothing changed yet.
+- Add correlation_id to logging and accept X-Correlation-Id header in
+  fastapi service.
+
+- Add `SyncFluentbitGateway`.
+
+- Log the nanosecond-precision "time" instead of the second-precision logtime
+  in `[Sync]FluentbitGateway`.
 
 
 0.6.9 (2023-10-11)

+ 12 - 0
clean_python/base/domain/context.py

@@ -4,6 +4,7 @@ import os
 from contextvars import ContextVar
 from typing import FrozenSet
 from typing import Optional
+from uuid import UUID
 
 from pydantic import AnyUrl
 from pydantic import FileUrl
@@ -45,6 +46,9 @@ class Context:
         self._tenant_value: ContextVar[Optional[Tenant]] = ContextVar(
             "tenant_value", default=None
         )
+        self._correlation_id_value: ContextVar[Optional[UUID]] = ContextVar(
+            "correlation_id", default=None
+        )
 
     @property
     def path(self) -> AnyUrl:
@@ -70,5 +74,13 @@ class Context:
     def tenant(self, value: Optional[Tenant]) -> None:
         self._tenant_value.set(value)
 
+    @property
+    def correlation_id(self) -> Optional[UUID]:
+        return self._correlation_id_value.get()
+
+    @correlation_id.setter
+    def correlation_id(self, value: Optional[UUID]) -> None:
+        self._correlation_id_value.set(value)
+
 
 ctx = Context()

+ 8 - 0
clean_python/dramatiq/dramatiq_task_logger.py

@@ -13,6 +13,7 @@ from dramatiq.errors import Retry
 from dramatiq.message import Message
 from dramatiq.middleware import SkipMessage
 
+from clean_python import ctx
 from clean_python import Gateway
 from clean_python.fluentbit import FluentbitGateway
 
@@ -72,6 +73,11 @@ class DramatiqTaskLogger:
         except AttributeError:
             duration = 0
 
+        try:
+            start_time = self.local.start_time
+        except AttributeError:
+            start_time = None
+
         log_dict = {
             "tag_suffix": "task_log",
             "task_id": message.message_id,
@@ -83,5 +89,7 @@ class DramatiqTaskLogger:
             "argsrepr": self.encoder.encode(message.args),
             "kwargsrepr": self.encoder.encode(message.kwargs),
             "result": result,
+            "time": start_time,
+            "correlation_id": str(ctx.correlation_id) if ctx.correlation_id else None,
         }
         return await self.gateway.add(log_dict)

+ 3 - 6
clean_python/fastapi/fastapi_access_logger.py

@@ -2,7 +2,6 @@
 
 import os
 import time
-from datetime import datetime
 from typing import Awaitable
 from typing import Callable
 from typing import Optional
@@ -12,6 +11,7 @@ from starlette.background import BackgroundTasks
 from starlette.requests import Request
 from starlette.responses import Response
 
+from clean_python import ctx
 from clean_python import Gateway
 from clean_python.fluentbit import FluentbitGateway
 
@@ -44,10 +44,6 @@ class FastAPIAccessLogger:
         return response
 
 
-def fmt_timestamp(timestamp: float) -> str:
-    return datetime.utcfromtimestamp(timestamp).isoformat() + "Z"
-
-
 async def log_access(
     gateway: Gateway,
     request: Request,
@@ -81,7 +77,8 @@ async def log_access(
         "status": response.status_code,
         "content_type": response.headers.get("content-type"),
         "content_length": content_length,
-        "time": fmt_timestamp(time_received),
+        "time": time_received,
         "request_time": request_time,
+        "correlation_id": str(ctx.correlation_id) if ctx.correlation_id else None,
     }
     await gateway.add(item)

+ 9 - 1
clean_python/fastapi/service.py

@@ -6,9 +6,12 @@ from typing import Dict
 from typing import List
 from typing import Optional
 from typing import Set
+from uuid import UUID
+from uuid import uuid4
 
 from fastapi import Depends
 from fastapi import FastAPI
+from fastapi import Header
 from fastapi import Request
 from fastapi.exceptions import RequestValidationError
 from starlette.types import ASGIApp
@@ -62,10 +65,15 @@ def get_auth_kwargs(auth_client: Optional[OAuth2SPAClientSettings]) -> Dict[str,
         }
 
 
-async def set_context(request: Request, token: Token = Depends(get_token)) -> None:
+async def set_context(
+    request: Request,
+    token: Token = Depends(get_token),
+    x_correlation_id: UUID = Header(default_factory=uuid4),
+) -> None:
     ctx.path = request.url
     ctx.user = token.user
     ctx.tenant = token.tenant
+    ctx.correlation_id = x_correlation_id
 
 
 async def health_check():

+ 32 - 5
clean_python/fluentbit/fluentbit_gateway.py

@@ -1,19 +1,46 @@
 # (c) Nelen & Schuurmans
 
+import time
+from typing import Tuple
+
 from asgiref.sync import sync_to_async
 from fluent.sender import FluentSender
 
 from clean_python import Gateway
 from clean_python import Json
+from clean_python import SyncGateway
+
+__all__ = ["FluentbitGateway", "SyncFluentbitGateway"]
+
+
+def unpack_item(item: Json) -> Tuple[str, float, Json]:
+    data = item.copy()
+    label = data.pop("tag_suffix", "")
+    timestamp = data.pop("time", None)
+    if timestamp is None:
+        timestamp = time.time()
+    return label, timestamp, data
 
-__all__ = ["FluentbitGateway"]
+
+class SyncFluentbitGateway(SyncGateway):
+    def __init__(self, tag: str, host: str, port: int):
+        self._sender = FluentSender(
+            tag, host=host, port=port, nanosecond_precision=True
+        )
+
+    def add(self, item: Json):
+        label, timestamp, data = unpack_item(item)
+        self._sender.emit_with_time(label, timestamp, data)
+        return {**data, "time": timestamp, "tag_suffix": label}
 
 
 class FluentbitGateway(Gateway):
     def __init__(self, tag: str, host: str, port: int):
-        self._sender = FluentSender(tag, host=host, port=port)
+        self._sync_gateway = SyncFluentbitGateway(tag, host, port)
 
     @sync_to_async
-    def add(self, item: Json) -> Json:
-        self._sender.emit(item.pop("tag_suffix", ""), item)
-        return item
+    def _add(self, item: Json) -> Json:
+        return self._sync_gateway.add(item)
+
+    async def add(self, item: Json) -> Json:
+        return await self._add(item)

+ 17 - 3
tests/fastapi/test_fastapi_access_logger.py

@@ -1,4 +1,5 @@
 from unittest import mock
+from uuid import uuid4
 
 import pytest
 from fastapi.routing import APIRoute
@@ -6,6 +7,7 @@ from starlette.requests import Request
 from starlette.responses import JSONResponse
 from starlette.responses import StreamingResponse
 
+from clean_python import ctx
 from clean_python import InMemoryGateway
 from clean_python.fastapi import FastAPIAccessLogger
 
@@ -67,8 +69,18 @@ def call_next(response):
     return func
 
 
+@pytest.fixture
+def correlation_id():
+    uid = uuid4()
+    ctx.correlation_id = uid
+    yield uid
+    ctx.correlation_id = None
+
+
 @mock.patch("time.time", return_value=0.0)
-async def test_logging(time, fastapi_access_logger, req, response, call_next):
+async def test_logging(
+    time, fastapi_access_logger, req, response, call_next, correlation_id
+):
     await fastapi_access_logger(req, call_next)
     assert len(fastapi_access_logger.gateway.data) == 0
     await response.background()
@@ -87,8 +99,9 @@ async def test_logging(time, fastapi_access_logger, req, response, call_next):
         "status": 200,
         "content_type": "application/json",
         "content_length": 13,
-        "time": "1970-01-01T00:00:00Z",
+        "time": 0.0,
         "request_time": 0.0,
+        "correlation_id": str(correlation_id),
     }
 
 
@@ -149,6 +162,7 @@ async def test_logging_minimal(
         "status": 200,
         "content_type": "text/html; charset=utf-8",
         "content_length": None,
-        "time": "1970-01-01T00:00:00Z",
+        "time": 0.0,
         "request_time": 0.0,
+        "correlation_id": None,
     }

+ 70 - 0
tests/fastapi/test_service_context.py

@@ -0,0 +1,70 @@
+from http import HTTPStatus
+from uuid import UUID
+from uuid import uuid4
+
+import pytest
+from fastapi.testclient import TestClient
+
+from clean_python import ctx
+from clean_python import InMemoryGateway
+from clean_python.fastapi import get
+from clean_python.fastapi import Resource
+from clean_python.fastapi import Service
+from clean_python.fastapi import v
+
+
+class FooResource(Resource, version=v(1), name="testing"):
+    @get("/context")
+    def context(self):
+        return {
+            "path": str(ctx.path),
+            "user": ctx.user,
+            "tenant": ctx.tenant,
+            "correlation_id": str(ctx.correlation_id),
+        }
+
+
+@pytest.fixture
+def app():
+    return Service(FooResource()).create_app(
+        title="test",
+        description="testing",
+        hostname="testserver",
+        access_logger_gateway=InMemoryGateway([]),
+    )
+
+
+@pytest.fixture
+def client(app):
+    return TestClient(app)
+
+
+def test_default_context(app, client: TestClient):
+    response = client.get(app.url_path_for("v1/context"))
+
+    assert response.status_code == HTTPStatus.OK
+
+    body = response.json()
+
+    assert body["path"] == "http://testserver/v1/context"
+    assert body["user"] == {"id": "DEV", "name": "dev"}
+    assert body["tenant"] is None
+    UUID(body["correlation_id"])  # randomly generated uuid
+
+    assert ctx.correlation_id is None
+
+
+def test_x_correlation_id_header(app, client: TestClient):
+    uid = str(uuid4())
+    response = client.get(
+        app.url_path_for("v1/context"),
+        headers={"X-Correlation-Id": uid},
+    )
+
+    assert response.status_code == HTTPStatus.OK
+
+    body = response.json()
+
+    assert body["correlation_id"] == uid
+
+    assert ctx.correlation_id is None

+ 0 - 0
tests/fastapi/test_service.py → tests/fastapi/test_service_init.py


+ 29 - 8
tests/test_dramatiq_task_logger.py

@@ -1,10 +1,12 @@
 import os
 from unittest import mock
+from uuid import uuid4
 
 import pytest
 from dramatiq.errors import Retry
 from dramatiq.message import Message
 
+from clean_python import ctx
 from clean_python import InMemoryGateway
 from clean_python.dramatiq import DramatiqTaskLogger
 
@@ -22,6 +24,20 @@ def task_logger(in_memory_gateway):
     )
 
 
+@pytest.fixture
+def correlation_id():
+    uid = uuid4()
+    ctx.correlation_id = uid
+    yield uid
+    ctx.correlation_id = None
+
+
+@pytest.fixture
+def patched_time():
+    with mock.patch("time.time", side_effect=(0, 123.456)):
+        yield
+
+
 @pytest.fixture
 def message():
     return Message(
@@ -36,32 +52,36 @@ def message():
 
 
 @pytest.fixture
-def expected():
+def expected(correlation_id):
     return {
         "id": 1,
         "tag_suffix": "task_log",
         "task_id": "abc123",
         "name": "my_task",
         "state": "SUCCESS",
-        "duration": 0,
+        "duration": 123.456,
         "retries": 0,
         "origin": f"host-{os.getpid()}",
         "argsrepr": b"[1,2]",
         "kwargsrepr": b'{"foo":"bar"}',
         "result": None,
+        "time": 0.0,
+        "correlation_id": str(correlation_id),
     }
 
 
-@mock.patch("time.time", return_value=123)
-async def test_log_success(time, task_logger, in_memory_gateway, message, expected):
+async def test_log_success(
+    patched_time, task_logger, in_memory_gateway, message, expected
+):
     await task_logger.start()
     await task_logger.stop(message)
 
     assert in_memory_gateway.data[1] == expected
 
 
-@mock.patch("time.time", new=mock.Mock(return_value=123))
-async def test_log_fail(task_logger, in_memory_gateway, message, expected):
+async def test_log_fail(
+    patched_time, task_logger, in_memory_gateway, message, expected
+):
     await task_logger.start()
     await task_logger.stop(message, exception=ValueError("test"))
 
@@ -72,8 +92,9 @@ async def test_log_fail(task_logger, in_memory_gateway, message, expected):
     }
 
 
-@mock.patch("time.time", return_value=123)
-async def test_log_retry(time, task_logger, in_memory_gateway, message, expected):
+async def test_log_retry(
+    patched_time, task_logger, in_memory_gateway, message, expected
+):
     await task_logger.start()
     await task_logger.stop(message, exception=Retry("test"))