Захват логов в тестах
InMemorySink — приёмник на основе кольцевого буфера, спроектированный для тестов. В сочетании с scope_sinks (по спеке §6.2) он захватывает только записи, отправленные внутри блока, чтобы тесты могли делать assertion'ы по захваченным записям без помех от соседних тестов.
Шаг 1. Захват записей одного теста
- Python
- TypeScript
- Go
from dagstack.logger import InMemorySink, Logger
def test_order_placement_logs_audit_event():
sink = InMemorySink(capacity=100)
logger = Logger.get("order_service.checkout")
with logger.scope_sinks([sink]):
place_order(order_id=1234, user_id=42)
records = sink.records()
audit = next(r for r in records if r.body == "order placed")
assert audit.severity_text == "INFO"
assert audit.attributes["order.id"] == 1234
assert audit.attributes["user.id"] == 42
import { describe, expect, it } from "vitest";
import { InMemorySink, Logger } from "@dagstack/logger";
describe("order placement", () => {
it("logs an audit event", async () => {
const sink = new InMemorySink({ capacity: 100 });
const logger = Logger.get("order_service.checkout");
await logger.scopeSinks([sink], async () => {
await placeOrder(1234, 42);
});
const records = sink.records();
const audit = records.find((r) => r.body === "order placed");
expect(audit?.severity_text).toBe("INFO");
expect(audit?.attributes["order.id"]).toBe(1234);
expect(audit?.attributes["user.id"]).toBe(42);
});
});
func TestOrderPlacementLogsAuditEvent(t *testing.T) {
sink := logger.NewInMemorySink(100, 1)
log := logger.Get("order_service.checkout")
err := log.ScopeSinks(context.Background(), []logger.Sink{sink}, func(ctx context.Context) error {
placeOrder(ctx, 1234, 42)
return nil
})
if err != nil {
t.Fatal(err)
}
records := sink.Records()
var audit *logger.LogRecord
for _, r := range records {
if r.Body == "order placed" {
audit = r
break
}
}
if audit == nil {
t.Fatal("audit-запись не захвачена")
}
if audit.SeverityText != logger.SeverityTextInfo {
t.Errorf("severity_text = %q, want INFO", audit.SeverityText)
}
if audit.Attributes["order.id"] != 1234 {
t.Errorf("order.id = %v", audit.Attributes["order.id"])
}
}
Scoped-блок подменяет приёмники на исходном логгере; любой другой модуль, разрешающий тот же логгер через Logger.get("order_service.checkout") пока активен блок, тоже пишет в sink. На выходе предыдущие приёмники восстанавливаются.
Шаг 2. Переиспользуемая фикстура
Pytest-сьюты обычно оборачивают этот паттерн в фикстуру. Фикстура создаёт свежий InMemorySink, захватывает записи во время теста и автоматически разбирается:
- Python
- TypeScript
- Go
import pytest
from dagstack.logger import InMemorySink, Logger
@pytest.fixture
def captured_logs():
"""Захват записей, отправленных логгером `order_service` во время теста."""
sink = InMemorySink(capacity=1000)
logger = Logger.get("order_service")
with logger.scope_sinks([sink]):
yield sink
def test_audit_trail(captured_logs):
place_order(order_id=1234, user_id=42)
records = captured_logs.records()
assert len(records) == 1
assert records[0].attributes["order.id"] == 1234
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { InMemorySink, Logger } from "@dagstack/logger";
/**
* Vitest-фикстура: захват записей логгера `order_service`
* на время каждого теста сьюта.
*/
describe("audit trail", () => {
let sink: InMemorySink;
beforeEach(() => {
sink = new InMemorySink({ capacity: 1000 });
const logger = Logger.get("order_service");
logger.setSinks([sink]);
});
afterEach(() => {
sink.clear();
Logger.get("order_service").setSinks([]);
});
it("captures one record per order", async () => {
await placeOrder(1234, 42);
const records = sink.records();
expect(records.length).toBe(1);
expect(records[0].attributes["order.id"]).toBe(1234);
});
});
// captureLogs — t.Cleanup-ориентированный хелпер: scope'ит свежий InMemorySink
// на "order_service" для теста и восстанавливает исходные приёмники в cleanup.
func captureLogs(t *testing.T) *logger.InMemorySink {
t.Helper()
sink := logger.NewInMemorySink(1000, 1)
log := logger.Get("order_service")
prev := log.EffectiveSinks()
log.SetSinks([]logger.Sink{sink})
t.Cleanup(func() {
log.SetSinks(prev)
})
return sink
}
func TestAuditTrail(t *testing.T) {
sink := captureLogs(t)
placeOrder(context.Background(), 1234, 42)
records := sink.Records()
if len(records) != 1 {
t.Fatalf("len(records) = %d, want 1", len(records))
}
if records[0].Attributes["order.id"] != 1234 {
t.Errorf("order.id = %v", records[0].Attributes["order.id"])
}
}
Шаг 3. Assertion'ы по атрибутам
Записи — типизированные объекты (а не JSON-строки), так что доступ к атрибутам прямой. Примеры распространённых проверок:
- Python
- TypeScript
- Go
def test_redaction_masks_api_keys(captured_logs):
Logger.get("order_service").info("authenticated", attributes={
"user.id": 42,
"api_key": "sk-supersecret",
})
record = captured_logs.records()[0]
assert record.attributes["user.id"] == 42
assert record.attributes["api_key"] == "***"
def test_only_one_error_emitted(captured_logs):
run_business_logic()
errors = [r for r in captured_logs.records() if r.severity_text == "ERROR"]
assert len(errors) == 1
assert errors[0].attributes["exception.type"] == "OrderValidationError"
def test_trace_context_propagated(captured_logs):
with open_otel_span(name="checkout", trace_id=expected_trace_id):
Logger.get("order_service").info("inside span")
record = captured_logs.records()[0]
assert record.trace_id == expected_trace_id
assert record.span_id is not None
it("masks api keys", () => {
Logger.get("order_service").info("authenticated", {
"user.id": 42,
api_key: "sk-supersecret",
});
const record = sink.records()[0];
expect(record.attributes["user.id"]).toBe(42);
expect(record.attributes.api_key).toBe("***");
});
it("emits exactly one error", async () => {
await runBusinessLogic();
const errors = sink.records().filter((r) => r.severity_text === "ERROR");
expect(errors).toHaveLength(1);
expect(errors[0].attributes["exception.type"]).toBe("OrderValidationError");
});
func TestRedactionMasksAPIKeys(t *testing.T) {
sink := captureLogs(t)
logger.Get("order_service").Info("authenticated", logger.Attrs{
"user.id": 42,
"api_key": "sk-supersecret",
})
record := sink.Records()[0]
if record.Attributes["api_key"] != logger.RedactedPlaceholder {
t.Errorf("api_key = %v, want %q", record.Attributes["api_key"], logger.RedactedPlaceholder)
}
if record.Attributes["user.id"] != 42 {
t.Errorf("user.id = %v", record.Attributes["user.id"])
}
}
func TestOnlyOneErrorEmitted(t *testing.T) {
sink := captureLogs(t)
runBusinessLogic()
var errors []*logger.LogRecord
for _, r := range sink.Records() {
if r.SeverityText == logger.SeverityTextError {
errors = append(errors, r)
}
}
if len(errors) != 1 {
t.Fatalf("len(errors) = %d, want 1", len(errors))
}
if errors[0].Attributes["exception.type"] == nil {
t.Errorf("exception.type отсутствует")
}
}
Шаг 4. Сброс между assertion'ами
InMemorySink.clear() опустошает буфер для следующей проверки в том же тесте:
- Python
- TypeScript
- Go
def test_phase_separation(captured_logs):
run_phase_one()
assert any(r.body == "phase 1 complete" for r in captured_logs.records())
captured_logs.clear()
run_phase_two()
assert all(r.attributes.get("phase") == "two" for r in captured_logs.records())
it("separates phases", async () => {
await runPhaseOne();
expect(sink.records().some((r) => r.body === "phase 1 complete")).toBe(true);
sink.clear();
await runPhaseTwo();
expect(sink.records().every((r) => r.attributes.phase === "two")).toBe(true);
});
func TestPhaseSeparation(t *testing.T) {
sink := captureLogs(t)
runPhaseOne()
found := false
for _, r := range sink.Records() {
if r.Body == "phase 1 complete" {
found = true
break
}
}
if !found {
t.Fatal("запись phase 1 отсутствует")
}
sink.Clear()
runPhaseTwo()
for _, r := range sink.Records() {
if r.Attributes["phase"] != "two" {
t.Errorf("phase = %v", r.Attributes["phase"])
}
}
}
Шаг 5. Избегание переполнения capacity
Дефолтный capacity=1000 подходит для большинства тестов. Для нагрузочных тестов или batch-операций, отправляющих тысячи записей, либо подними cap, либо проверяй в реальном времени:
- Python
- TypeScript
- Go
def test_high_volume(captured_logs):
sink = InMemorySink(capacity=10_000)
with Logger.get("indexer").scope_sinks([sink]):
index_repository()
assert len(sink.records()) <= sink.capacity
assert any(r.attributes.get("event.name") == "completed" for r in sink.records())
it("captures high-volume indexing", async () => {
const sink = new InMemorySink({ capacity: 10_000 });
const logger = Logger.get("indexer");
await logger.scopeSinks([sink], async () => {
await indexRepository();
});
expect(sink.records().length).toBeLessThanOrEqual(sink.capacity);
expect(sink.records().some((r) => r.attributes["event.name"] === "completed")).toBe(true);
});
func TestHighVolume(t *testing.T) {
sink := logger.NewInMemorySink(10_000, 1)
log := logger.Get("indexer")
err := log.ScopeSinks(context.Background(), []logger.Sink{sink}, func(ctx context.Context) error {
indexRepository(ctx)
return nil
})
if err != nil {
t.Fatal(err)
}
records := sink.Records()
if len(records) > sink.Capacity() {
t.Errorf("len(records) = %d > capacity %d", len(records), sink.Capacity())
}
}
При превышении capacity отбрасываются самые старые записи — sink.records() всегда отражает самые свежие до capacity. Это совпадает с production-поведением InMemorySink и предотвращает неограниченный рост памяти в долгих тестах.
Распространённые ошибки
- Забыли обернуть тест в
scope_sinks. Записи утекают в глобальные приёмники (обычноConsoleSinkвpytest -s), иsink.records()пуст, хотя код отправлял строки. - Assertion на timestamp.
time_unix_nano— это значение wall-clock; assertion'ы должны использовать диапазоны (abs(record.time_unix_nano - now) < 5_000_000_000) или вовсе пропускать поле. - Захватили не тот логгер.
Logger.get("order_service")иLogger.get("order_service.checkout")— разные логгеры. Scope на родителе захватывает только записи, отправленные через родителя или его потомков, но не через соседа вродеLogger.get("order_service_audit"). - Многопоточные тесты.
InMemorySinkпотокобезопасен (использует внутренний lock и дляemit, и дляrecords()), но scoped-переопределение действует на контекст потока, владеющего scope. Если worker-поток внутри теста дорабатывает после выхода из scope, его отправки попадут обратно в глобальные приёмники.
См. также
- Локальные переопределения — полная механика
scope_sinks. - Приёмники → InMemorySink — протокол и семантика capacity.
- Поля LogRecord — как выглядит типизированный снимок.