Skip to main content

Capturing logs in tests

InMemorySink is a ring-buffer sink designed for tests. Combined with scope_sinks (per spec §6.2), it captures only the records emitted inside a block so tests can assert on the captured records without interference from sibling tests.

Step 1. Capture records for one test

from dagstack.logger import InMemorySink, Logger


def test_order_placement_logs_audit_event():
sink = InMemorySink(capacity=100)
logger = Logger.get("order_service.checkout")

with logger.scope_sinks([sink]):
place_order(order_id=1234, user_id=42)

records = sink.records()
audit = next(r for r in records if r.body == "order placed")
assert audit.severity_text == "INFO"
assert audit.attributes["order.id"] == 1234
assert audit.attributes["user.id"] == 42

The scoped block swaps sinks on the original logger; any other module that resolves the same logger via Logger.get("order_service.checkout") while the block is active also writes into sink. On exit, the previous sinks are restored.

Step 2. Reusable test fixture

Pytest test suites typically wrap the pattern in a fixture. The fixture creates a fresh InMemorySink, captures records during the test, and is torn down automatically:

import pytest
from dagstack.logger import InMemorySink, Logger


@pytest.fixture
def captured_logs():
"""Capture records emitted by the `order_service` logger during the test."""
sink = InMemorySink(capacity=1000)
logger = Logger.get("order_service")
with logger.scope_sinks([sink]):
yield sink


def test_audit_trail(captured_logs):
place_order(order_id=1234, user_id=42)
records = captured_logs.records()
assert len(records) == 1
assert records[0].attributes["order.id"] == 1234

Step 3. Asserting on attributes

Records are typed objects (not JSON strings), so attribute access is direct. Examples of common assertions:

def test_redaction_masks_api_keys(captured_logs):
Logger.get("order_service").info("authenticated", attributes={
"user.id": 42,
"api_key": "sk-supersecret",
})

record = captured_logs.records()[0]
assert record.attributes["user.id"] == 42
assert record.attributes["api_key"] == "***"


def test_only_one_error_emitted(captured_logs):
run_business_logic()
errors = [r for r in captured_logs.records() if r.severity_text == "ERROR"]
assert len(errors) == 1
assert errors[0].attributes["exception.type"] == "OrderValidationError"


def test_trace_context_propagated(captured_logs):
with open_otel_span(name="checkout", trace_id=expected_trace_id):
Logger.get("order_service").info("inside span")

record = captured_logs.records()[0]
assert record.trace_id == expected_trace_id
assert record.span_id is not None

Step 4. Resetting between assertions

InMemorySink.clear() empties the buffer for the next assertion in the same test:

def test_phase_separation(captured_logs):
run_phase_one()
assert any(r.body == "phase 1 complete" for r in captured_logs.records())

captured_logs.clear()

run_phase_two()
assert all(r.attributes.get("phase") == "two" for r in captured_logs.records())

Step 5. Avoiding capacity overflow

The default capacity=1000 works for most tests. For load tests or batch operations that emit thousands of records, raise the cap or assert in real time:

def test_high_volume(captured_logs):
sink = InMemorySink(capacity=10_000)
with Logger.get("indexer").scope_sinks([sink]):
index_repository()

assert len(sink.records()) <= sink.capacity
assert any(r.attributes.get("event.name") == "completed" for r in sink.records())

When capacity is exceeded, the oldest records are dropped — sink.records() always reflects the most recent up to capacity. This matches production behaviour for InMemorySink and avoids unbounded memory growth in long-running tests.

Common pitfalls

  • Forgetting to wrap the test in scope_sinks. Records leak to the global sinks (typically ConsoleSink in pytest -s), and sink.records() is empty even though the code emitted lines.
  • Asserting on the timestamp. time_unix_nano is a wall-clock value; assertions should use ranges (abs(record.time_unix_nano - now) < 5_000_000_000) or skip the field entirely.
  • Capturing the wrong logger. Logger.get("order_service") and Logger.get("order_service.checkout") are different loggers. The scope on the parent only captures records emitted via the parent or its children — but not via a sibling like Logger.get("order_service_audit").
  • Multi-threaded tests. InMemorySink is thread-safe (uses an internal lock for both emit and records()), but the scoped override applies to the scope owner's thread context. If a worker thread inside the test runs after the scope exits, its emits land back in the global sinks.

See also