Перейти к основному содержимому

Реализация собственного приёмника

Приёмник — это любой объект, реализующий четыре метода протокола Sink: emit, flush, close, supports_severity, плюс атрибут id. Биндинги используют структурную типизацию (Python Protocol, TypeScript-интерфейс, Go-интерфейс) — нет базового класса для наследования. Conformance проверяется по форме, а не по identity класса.

Шаг 1. Выбор подходящего паттерна

Две распространённые категории:

  • Синхронный приёмник — пишет в локальный ресурс (файл, stdout, in-memory). Вызов emit выполняет I/O напрямую под локом; flush и close оборачивают жизненный цикл ресурса. Phase 1-приёмники (ConsoleSink, FileSink, InMemorySink) идут по этому паттерну.
  • Async-приёмник с worker'ом — пишет в удалённый эндпоинт (HTTP, gRPC, сетевой socket). Вызов emit ставит запись во внутреннюю буферную очередь; фоновый worker пакетами вычитывает буфер. flush(timeout) блокирует, пока буфер не опустеет или не истечёт timeout. Запланированный OTLPSink (Phase 2) идёт по этому паттерну.

В этом guide сначала разбирается синхронный случай — у него меньше движущихся частей. Паттерн async-worker набросан в конце.

Шаг 2. Реализация протокола

Минимальный приёмник, передающий записи в callable (полезно для проброса в Sentry, structlog или собственный формат):

import threading
from typing import Callable

from dagstack.logger import LogRecord, Sink


class CallbackSink:
"""Передаёт каждую LogRecord пользовательскому callable."""

def __init__(
self,
callback: Callable[[LogRecord], None],
*,
min_severity: int = 1,
) -> None:
self._callback = callback
self._min_severity = min_severity
self._lock = threading.Lock()
self._closed = False
self.id = f"callback:{callback.__name__}"

def emit(self, record: LogRecord) -> None:
if self._closed:
return
if not self.supports_severity(record.severity_number):
return
with self._lock:
if self._closed:
return
self._callback(record)

def flush(self, timeout: float = 5.0) -> None:
# Синхронный — буферизации нет.
return

def close(self) -> None:
with self._lock:
self._closed = True

def supports_severity(self, severity_number: int) -> bool:
return severity_number >= self._min_severity

Подключи приёмник рядом со встроенными:

import sentry_sdk
from dagstack.logger import ConsoleSink, configure


def forward_to_sentry(record):
if record.severity_number >= 17: # ERROR и выше
sentry_sdk.capture_message(
str(record.body),
level="error",
extras=record.attributes,
)


configure(
root_level="INFO",
sinks=[
ConsoleSink(mode="auto"),
CallbackSink(forward_to_sentry, min_severity=17),
],
)

Шаг 3. Соблюдение неблокирующего контракта

По спеке §7.1 вызов emit не должен блокировать вызывающего logger.info(...). Конкретно:

  • Сетевой вызов внутри emit блокирует поток запроса приложения. Нельзя.
  • Ограниченная очередь плюс фоновый worker, вычитывающий её. Можно.
  • Лок на длительность записи в локальный файл. Допустимо, когда файл на локальном диске и запись быстра (< 1 мс).

Для локальных ресурсов (Phase 1-приёмники) синхронный паттерн в порядке. Для удалённых — следуй паттерну async-worker в шаге 5.

Шаг 4. Обращение с маскированными атрибутами как с непрозрачными

К моменту вызова emit маскирование на атрибутах записи уже отработало (по спеке §10). Значения по ключам, оканчивающимся на _key, _secret и т.п., уже превратились в "***". Твой приёмник не должен пытаться восстановить исходное значение, логировать немаскированные атрибуты в другое место или прогонять значение через любой pattern matcher, способный его утечь.

Если приёмник пере-форматирует записи в vendor-специфичную форму (например, Sentry, Loki, Splunk), маскированные значения должны проходить без изменений. Гарантируй это в docstring приёмника, чтобы ревьюер видел сразу.

Шаг 5. Паттерн async-worker (набросок Phase 2)

Паттерн для удалённых приёмников (OTLP, Loki и др.):

# Набросок — полная реализация выйдет с OTLPSink в Phase 2.

import queue
import threading
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from dagstack.logger import LogRecord


class RemoteSink:
def __init__(self, endpoint: str, *, batch_size: int = 100, queue_size: int = 10_000):
self._endpoint = endpoint
self._batch_size = batch_size
self._queue: queue.Queue = queue.Queue(maxsize=queue_size)
self._dropped = 0
self._stop = threading.Event()
self._worker = threading.Thread(target=self._run, daemon=True)
self._worker.start()
self.id = f"remote:{endpoint}"

def emit(self, record: "LogRecord") -> None:
try:
self._queue.put_nowait(record)
except queue.Full:
# Отбрасываем самую старую — неблокирующий emit это контракт.
try:
self._queue.get_nowait()
except queue.Empty:
pass
try:
self._queue.put_nowait(record)
except queue.Full:
self._dropped += 1

def flush(self, timeout: float = 5.0) -> None:
self._queue.join_with_timeout(timeout) # кастомный хелпер, опущен

def close(self) -> None:
self._stop.set()
self._worker.join(timeout=5.0)

def supports_severity(self, severity_number: int) -> bool:
return True

def _run(self) -> None:
batch = []
while not self._stop.is_set():
try:
record = self._queue.get(timeout=1.0)
except queue.Empty:
continue
batch.append(record)
if len(batch) >= self._batch_size:
self._send(batch)
batch = []
if batch:
self._send(batch)

def _send(self, batch: list) -> None:
# POST на self._endpoint; при ошибке инкрементировать drop-счётчик.
...

Три инварианта для async-случая:

  1. Ограниченная очередь. queue_size лимитирует число pending-записей; при переполнении отбрасывается самая старая (или, если так настроено, блокируется вызывающий).
  2. Учёт отбрасываний. Веди dropped_total на каждый приёмник; выставляй наружу через метрики логгера (спека §12).
  3. Daemon-worker. Поток worker'а — daemon, чтобы не мешать выходу из процесса. close() join'ит поток с timeout.

Шаг 6. Тестирование приёмника

InMemorySink — каноническая референсная реализация синхронного приёмника — читай его исходник как чек-лист пограничных случаев (конкурентный emit + close, переполнение capacity, идемпотентный close). Твой тест-сьют должен покрывать как минимум:

  • Базовый emit + извлечение. Приёмник сохраняет или передаёт записи как ожидается.
  • Фильтрация по уровню. Записи ниже min_severity пропускаются без побочных эффектов.
  • Идемпотентный close. Повторный close() не выкидывает исключение.
  • Конкурентный emit + close. Приёмник не пишет в закрытый ресурс.
  • Учёт drop'ов (только async-случай). Drop-счётчик увеличивается ровно на число отброшенных записей.

Спека §14.4 перечисляет conformance-тесты для Phase 1-приёмников; повтори их для своего приёмника.

См. также