Capturing logs in tests
InMemorySink is a ring-buffer sink designed for tests. Combined with scope_sinks (per spec §6.2), it captures only the records emitted inside a block so tests can assert on the captured records without interference from sibling tests.
Step 1. Capture records for one test
- Python
- TypeScript
- Go
from dagstack.logger import InMemorySink, Logger
def test_order_placement_logs_audit_event():
sink = InMemorySink(capacity=100)
logger = Logger.get("order_service.checkout")
with logger.scope_sinks([sink]):
place_order(order_id=1234, user_id=42)
records = sink.records()
audit = next(r for r in records if r.body == "order placed")
assert audit.severity_text == "INFO"
assert audit.attributes["order.id"] == 1234
assert audit.attributes["user.id"] == 42
import { describe, expect, it } from "vitest";
import { InMemorySink, Logger } from "@dagstack/logger";
describe("order placement", () => {
it("logs an audit event", async () => {
const sink = new InMemorySink({ capacity: 100 });
const logger = Logger.get("order_service.checkout");
await logger.scopeSinks([sink], async () => {
await placeOrder(1234, 42);
});
const records = sink.records();
const audit = records.find((r) => r.body === "order placed");
expect(audit?.severity_text).toBe("INFO");
expect(audit?.attributes["order.id"]).toBe(1234);
expect(audit?.attributes["user.id"]).toBe(42);
});
});
func TestOrderPlacementLogsAuditEvent(t *testing.T) {
sink := logger.NewInMemorySink(100, 1)
log := logger.Get("order_service.checkout")
err := log.ScopeSinks(context.Background(), []logger.Sink{sink}, func(ctx context.Context) error {
placeOrder(ctx, 1234, 42)
return nil
})
if err != nil {
t.Fatal(err)
}
records := sink.Records()
var audit *logger.LogRecord
for _, r := range records {
if r.Body == "order placed" {
audit = r
break
}
}
if audit == nil {
t.Fatal("audit record not captured")
}
if audit.SeverityText != logger.SeverityTextInfo {
t.Errorf("severity_text = %q, want INFO", audit.SeverityText)
}
if audit.Attributes["order.id"] != 1234 {
t.Errorf("order.id = %v", audit.Attributes["order.id"])
}
}
The scoped block swaps sinks on the original logger; any other module that resolves the same logger via Logger.get("order_service.checkout") while the block is active also writes into sink. On exit, the previous sinks are restored.
Step 2. Reusable test fixture
Pytest test suites typically wrap the pattern in a fixture. The fixture creates a fresh InMemorySink, captures records during the test, and is torn down automatically:
- Python
- TypeScript
- Go
import pytest
from dagstack.logger import InMemorySink, Logger
@pytest.fixture
def captured_logs():
"""Capture records emitted by the `order_service` logger during the test."""
sink = InMemorySink(capacity=1000)
logger = Logger.get("order_service")
with logger.scope_sinks([sink]):
yield sink
def test_audit_trail(captured_logs):
place_order(order_id=1234, user_id=42)
records = captured_logs.records()
assert len(records) == 1
assert records[0].attributes["order.id"] == 1234
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { InMemorySink, Logger } from "@dagstack/logger";
/**
* Vitest fixture: capture records emitted by the `order_service` logger
* for the duration of each test in the suite.
*/
describe("audit trail", () => {
let sink: InMemorySink;
beforeEach(() => {
sink = new InMemorySink({ capacity: 1000 });
const logger = Logger.get("order_service");
logger.setSinks([sink]);
});
afterEach(() => {
sink.clear();
Logger.get("order_service").setSinks([]);
});
it("captures one record per order", async () => {
await placeOrder(1234, 42);
const records = sink.records();
expect(records.length).toBe(1);
expect(records[0].attributes["order.id"]).toBe(1234);
});
});
// captureLogs is a t.Cleanup-aware helper: it scopes a fresh InMemorySink
// onto "order_service" for the test and restores the original sinks on
// cleanup.
func captureLogs(t *testing.T) *logger.InMemorySink {
t.Helper()
sink := logger.NewInMemorySink(1000, 1)
log := logger.Get("order_service")
prev := log.EffectiveSinks()
log.SetSinks([]logger.Sink{sink})
t.Cleanup(func() {
log.SetSinks(prev)
})
return sink
}
func TestAuditTrail(t *testing.T) {
sink := captureLogs(t)
placeOrder(context.Background(), 1234, 42)
records := sink.Records()
if len(records) != 1 {
t.Fatalf("len(records) = %d, want 1", len(records))
}
if records[0].Attributes["order.id"] != 1234 {
t.Errorf("order.id = %v", records[0].Attributes["order.id"])
}
}
Step 3. Asserting on attributes
Records are typed objects (not JSON strings), so attribute access is direct. Examples of common assertions:
- Python
- TypeScript
- Go
def test_redaction_masks_api_keys(captured_logs):
Logger.get("order_service").info("authenticated", attributes={
"user.id": 42,
"api_key": "sk-supersecret",
})
record = captured_logs.records()[0]
assert record.attributes["user.id"] == 42
assert record.attributes["api_key"] == "***"
def test_only_one_error_emitted(captured_logs):
run_business_logic()
errors = [r for r in captured_logs.records() if r.severity_text == "ERROR"]
assert len(errors) == 1
assert errors[0].attributes["exception.type"] == "OrderValidationError"
def test_trace_context_propagated(captured_logs):
with open_otel_span(name="checkout", trace_id=expected_trace_id):
Logger.get("order_service").info("inside span")
record = captured_logs.records()[0]
assert record.trace_id == expected_trace_id
assert record.span_id is not None
it("masks api keys", () => {
Logger.get("order_service").info("authenticated", {
"user.id": 42,
api_key: "sk-supersecret",
});
const record = sink.records()[0];
expect(record.attributes["user.id"]).toBe(42);
expect(record.attributes.api_key).toBe("***");
});
it("emits exactly one error", async () => {
await runBusinessLogic();
const errors = sink.records().filter((r) => r.severity_text === "ERROR");
expect(errors).toHaveLength(1);
expect(errors[0].attributes["exception.type"]).toBe("OrderValidationError");
});
func TestRedactionMasksAPIKeys(t *testing.T) {
sink := captureLogs(t)
logger.Get("order_service").Info("authenticated", logger.Attrs{
"user.id": 42,
"api_key": "sk-supersecret",
})
record := sink.Records()[0]
if record.Attributes["api_key"] != logger.RedactedPlaceholder {
t.Errorf("api_key = %v, want %q", record.Attributes["api_key"], logger.RedactedPlaceholder)
}
if record.Attributes["user.id"] != 42 {
t.Errorf("user.id = %v", record.Attributes["user.id"])
}
}
func TestOnlyOneErrorEmitted(t *testing.T) {
sink := captureLogs(t)
runBusinessLogic()
var errors []*logger.LogRecord
for _, r := range sink.Records() {
if r.SeverityText == logger.SeverityTextError {
errors = append(errors, r)
}
}
if len(errors) != 1 {
t.Fatalf("len(errors) = %d, want 1", len(errors))
}
if errors[0].Attributes["exception.type"] == nil {
t.Errorf("exception.type missing")
}
}
Step 4. Resetting between assertions
InMemorySink.clear() empties the buffer for the next assertion in the same test:
- Python
- TypeScript
- Go
def test_phase_separation(captured_logs):
run_phase_one()
assert any(r.body == "phase 1 complete" for r in captured_logs.records())
captured_logs.clear()
run_phase_two()
assert all(r.attributes.get("phase") == "two" for r in captured_logs.records())
it("separates phases", async () => {
await runPhaseOne();
expect(sink.records().some((r) => r.body === "phase 1 complete")).toBe(true);
sink.clear();
await runPhaseTwo();
expect(sink.records().every((r) => r.attributes.phase === "two")).toBe(true);
});
func TestPhaseSeparation(t *testing.T) {
sink := captureLogs(t)
runPhaseOne()
found := false
for _, r := range sink.Records() {
if r.Body == "phase 1 complete" {
found = true
break
}
}
if !found {
t.Fatal("phase 1 record missing")
}
sink.Clear()
runPhaseTwo()
for _, r := range sink.Records() {
if r.Attributes["phase"] != "two" {
t.Errorf("phase = %v", r.Attributes["phase"])
}
}
}
Step 5. Avoiding capacity overflow
The default capacity=1000 works for most tests. For load tests or batch operations that emit thousands of records, raise the cap or assert in real time:
- Python
- TypeScript
- Go
def test_high_volume(captured_logs):
sink = InMemorySink(capacity=10_000)
with Logger.get("indexer").scope_sinks([sink]):
index_repository()
assert len(sink.records()) <= sink.capacity
assert any(r.attributes.get("event.name") == "completed" for r in sink.records())
it("captures high-volume indexing", async () => {
const sink = new InMemorySink({ capacity: 10_000 });
const logger = Logger.get("indexer");
await logger.scopeSinks([sink], async () => {
await indexRepository();
});
expect(sink.records().length).toBeLessThanOrEqual(sink.capacity);
expect(sink.records().some((r) => r.attributes["event.name"] === "completed")).toBe(true);
});
func TestHighVolume(t *testing.T) {
sink := logger.NewInMemorySink(10_000, 1)
log := logger.Get("indexer")
err := log.ScopeSinks(context.Background(), []logger.Sink{sink}, func(ctx context.Context) error {
indexRepository(ctx)
return nil
})
if err != nil {
t.Fatal(err)
}
records := sink.Records()
if len(records) > sink.Capacity() {
t.Errorf("len(records) = %d > capacity %d", len(records), sink.Capacity())
}
}
When capacity is exceeded, the oldest records are dropped — sink.records() always reflects the most recent up to capacity. This matches production behaviour for InMemorySink and avoids unbounded memory growth in long-running tests.
Common pitfalls
- Forgetting to wrap the test in
scope_sinks. Records leak to the global sinks (typicallyConsoleSinkinpytest -s), andsink.records()is empty even though the code emitted lines. - Asserting on the timestamp.
time_unix_nanois a wall-clock value; assertions should use ranges (abs(record.time_unix_nano - now) < 5_000_000_000) or skip the field entirely. - Capturing the wrong logger.
Logger.get("order_service")andLogger.get("order_service.checkout")are different loggers. The scope on the parent only captures records emitted via the parent or its children — but not via a sibling likeLogger.get("order_service_audit"). - Multi-threaded tests.
InMemorySinkis thread-safe (uses an internal lock for bothemitandrecords()), but the scoped override applies to the scope owner's thread context. If a worker thread inside the test runs after the scope exits, its emits land back in the global sinks.
See also
- Scoped overrides — full mechanics of
scope_sinks. - Sinks → InMemorySink — protocol and capacity semantics.
- LogRecord fields — what the typed snapshot looks like.