Перейти к основному содержимому

Захват логов в тестах

InMemorySink — приёмник на основе кольцевого буфера, спроектированный для тестов. В сочетании с scope_sinks (по спеке §6.2) он захватывает только записи, отправленные внутри блока, чтобы тесты могли делать assertion'ы по захваченным записям без помех от соседних тестов.

Шаг 1. Захват записей одного теста

from dagstack.logger import InMemorySink, Logger


def test_order_placement_logs_audit_event():
sink = InMemorySink(capacity=100)
logger = Logger.get("order_service.checkout")

with logger.scope_sinks([sink]):
place_order(order_id=1234, user_id=42)

records = sink.records()
audit = next(r for r in records if r.body == "order placed")
assert audit.severity_text == "INFO"
assert audit.attributes["order.id"] == 1234
assert audit.attributes["user.id"] == 42

Scoped-блок подменяет приёмники на исходном логгере; любой другой модуль, разрешающий тот же логгер через Logger.get("order_service.checkout") пока активен блок, тоже пишет в sink. На выходе предыдущие приёмники восстанавливаются.

Шаг 2. Переиспользуемая фикстура

Pytest-сьюты обычно оборачивают этот паттерн в фикстуру. Фикстура создаёт свежий InMemorySink, захватывает записи во время теста и автоматически разбирается:

import pytest
from dagstack.logger import InMemorySink, Logger


@pytest.fixture
def captured_logs():
"""Захват записей, отправленных логгером `order_service` во время теста."""
sink = InMemorySink(capacity=1000)
logger = Logger.get("order_service")
with logger.scope_sinks([sink]):
yield sink


def test_audit_trail(captured_logs):
place_order(order_id=1234, user_id=42)
records = captured_logs.records()
assert len(records) == 1
assert records[0].attributes["order.id"] == 1234

Шаг 3. Assertion'ы по атрибутам

Записи — типизированные объекты (а не JSON-строки), так что доступ к атрибутам прямой. Примеры распространённых проверок:

def test_redaction_masks_api_keys(captured_logs):
Logger.get("order_service").info("authenticated", attributes={
"user.id": 42,
"api_key": "sk-supersecret",
})

record = captured_logs.records()[0]
assert record.attributes["user.id"] == 42
assert record.attributes["api_key"] == "***"


def test_only_one_error_emitted(captured_logs):
run_business_logic()
errors = [r for r in captured_logs.records() if r.severity_text == "ERROR"]
assert len(errors) == 1
assert errors[0].attributes["exception.type"] == "OrderValidationError"


def test_trace_context_propagated(captured_logs):
with open_otel_span(name="checkout", trace_id=expected_trace_id):
Logger.get("order_service").info("inside span")

record = captured_logs.records()[0]
assert record.trace_id == expected_trace_id
assert record.span_id is not None

Шаг 4. Сброс между assertion'ами

InMemorySink.clear() опустошает буфер для следующей проверки в том же тесте:

def test_phase_separation(captured_logs):
run_phase_one()
assert any(r.body == "phase 1 complete" for r in captured_logs.records())

captured_logs.clear()

run_phase_two()
assert all(r.attributes.get("phase") == "two" for r in captured_logs.records())

Шаг 5. Избегание переполнения capacity

Дефолтный capacity=1000 подходит для большинства тестов. Для нагрузочных тестов или batch-операций, отправляющих тысячи записей, либо подними cap, либо проверяй в реальном времени:

def test_high_volume(captured_logs):
sink = InMemorySink(capacity=10_000)
with Logger.get("indexer").scope_sinks([sink]):
index_repository()

assert len(sink.records()) <= sink.capacity
assert any(r.attributes.get("event.name") == "completed" for r in sink.records())

При превышении capacity отбрасываются самые старые записи — sink.records() всегда отражает самые свежие до capacity. Это совпадает с production-поведением InMemorySink и предотвращает неограниченный рост памяти в долгих тестах.

Распространённые ошибки

  • Забыли обернуть тест в scope_sinks. Записи утекают в глобальные приёмники (обычно ConsoleSink в pytest -s), и sink.records() пуст, хотя код отправлял строки.
  • Assertion на timestamp. time_unix_nano — это значение wall-clock; assertion'ы должны использовать диапазоны (abs(record.time_unix_nano - now) < 5_000_000_000) или вовсе пропускать поле.
  • Захватили не тот логгер. Logger.get("order_service") и Logger.get("order_service.checkout") — разные логгеры. Scope на родителе захватывает только записи, отправленные через родителя или его потомков, но не через соседа вроде Logger.get("order_service_audit").
  • Многопоточные тесты. InMemorySink потокобезопасен (использует внутренний lock и для emit, и для records()), но scoped-переопределение действует на контекст потока, владеющего scope. Если worker-поток внутри теста дорабатывает после выхода из scope, его отправки попадут обратно в глобальные приёмники.

См. также