Реализация собственного приёмника
Приёмник — это любой объект, реализующий четыре метода протокола Sink: emit, flush, close, supports_severity, плюс атрибут id. Биндинги используют структурную типизацию (Python Protocol, TypeScript-интерфейс, Go-интерфейс) — нет базового класса для наследования. Conformance проверяется по форме, а не по identity класса.
Шаг 1. Выбор подходящего паттерна
Две распространённые категории:
- Синхронный приёмник — пишет в локальный ресурс (файл, stdout, in-memory). Вызов
emitвыполняет I/O напрямую под локом;flushиcloseоборачивают жизненный цикл ресурса. Phase 1-приёмники (ConsoleSink,FileSink,InMemorySink) идут по этому паттерну. - Async-приёмник с worker'ом — пишет в удалённый эндпоинт (HTTP, gRPC, сетевой socket). Вызов
emitставит запись во внутреннюю буферную очередь; фоновый worker пакетами вычитывает буфер.flush(timeout)блокирует, пока буфер не опустеет или не истечёт timeout. ЗапланированныйOTLPSink(Phase 2) идёт по этому паттерну.
В этом guide сначала разбирается синхронный случай — у него меньше движущихся частей. Паттерн async-worker набросан в конце.
Шаг 2. Реализация протокола
Минимальный приёмник, передающий записи в callable (полезно для проброса в Sentry, structlog или собственный формат):
- Python
- TypeScript
- Go
import threading
from typing import Callable
from dagstack.logger import LogRecord, Sink
class CallbackSink:
"""Передаёт каждую LogRecord пользовательскому callable."""
def __init__(
self,
callback: Callable[[LogRecord], None],
*,
min_severity: int = 1,
) -> None:
self._callback = callback
self._min_severity = min_severity
self._lock = threading.Lock()
self._closed = False
self.id = f"callback:{callback.__name__}"
def emit(self, record: LogRecord) -> None:
if self._closed:
return
if not self.supports_severity(record.severity_number):
return
with self._lock:
if self._closed:
return
self._callback(record)
def flush(self, timeout: float = 5.0) -> None:
# Синхронный — буферизации нет.
return
def close(self) -> None:
with self._lock:
self._closed = True
def supports_severity(self, severity_number: int) -> bool:
return severity_number >= self._min_severity
import type { FlushResult, LogRecord, Sink } from "@dagstack/logger";
type RecordCallback = (record: LogRecord) => void;
/** Передаёт каждый LogRecord в пользовательский callback. */
export class CallbackSink implements Sink {
public readonly id: string;
private readonly callback: RecordCallback;
private readonly minSeverity: number;
private closed = false;
constructor(callback: RecordCallback, options: { minSeverity?: number } = {}) {
this.callback = callback;
this.minSeverity = options.minSeverity ?? 1;
this.id = `callback:${callback.name || "anonymous"}`;
}
emit(record: LogRecord): void {
if (this.closed) return;
if (!this.supportsSeverity(record.severity_number)) return;
this.callback(record);
}
async flush(_timeoutMs?: number): Promise<FlushResult> {
return { ok: true };
}
async close(): Promise<void> {
this.closed = true;
}
supportsSeverity(severityNumber: number): boolean {
return severityNumber >= this.minSeverity;
}
}
package mysinks
import (
"fmt"
"sync"
"go.dagstack.dev/logger"
)
// CallbackSink передаёт каждый LogRecord в пользовательский callback.
type CallbackSink struct {
callback func(*logger.LogRecord)
minSeverity int
id string
mu sync.Mutex
closed bool
}
func NewCallbackSink(callback func(*logger.LogRecord), minSeverity int) *CallbackSink {
return &CallbackSink{
callback: callback,
minSeverity: minSeverity,
id: fmt.Sprintf("callback:%p", callback),
}
}
func (s *CallbackSink) ID() string { return s.id }
func (s *CallbackSink) SupportsSeverity(severityNumber int) bool {
return severityNumber >= s.minSeverity
}
func (s *CallbackSink) Emit(record *logger.LogRecord) {
if record == nil {
return
}
if !s.SupportsSeverity(record.SeverityNumber) {
return
}
s.mu.Lock()
if s.closed {
s.mu.Unlock()
return
}
s.mu.Unlock()
s.callback(record)
}
func (s *CallbackSink) Flush(_ float64) error { return nil }
func (s *CallbackSink) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
s.closed = true
return nil
}
Подключи приёмник рядом со встроенными:
- Python
- TypeScript
- Go
import sentry_sdk
from dagstack.logger import ConsoleSink, configure
def forward_to_sentry(record):
if record.severity_number >= 17: # ERROR и выше
sentry_sdk.capture_message(
str(record.body),
level="error",
extras=record.attributes,
)
configure(
root_level="INFO",
sinks=[
ConsoleSink(mode="auto"),
CallbackSink(forward_to_sentry, min_severity=17),
],
)
import * as Sentry from "@sentry/node";
import { ConsoleSink, configure, type LogRecord } from "@dagstack/logger";
import { CallbackSink } from "./callback-sink";
function forwardToSentry(record: LogRecord): void {
if (record.severity_number >= 17) { // ERROR и выше
Sentry.captureMessage(String(record.body), {
level: "error",
extra: { ...record.attributes },
});
}
}
configure({
rootLevel: "INFO",
sinks: [
new ConsoleSink({ mode: "auto" }),
new CallbackSink(forwardToSentry, { minSeverity: 17 }),
],
});
import (
"github.com/getsentry/sentry-go"
"go.dagstack.dev/logger"
)
func forwardToSentry(record *logger.LogRecord) {
if record.SeverityNumber >= int(logger.SeverityError) {
event := sentry.NewEvent()
event.Level = sentry.LevelError
event.Message = fmt.Sprint(record.Body)
event.Extra = map[string]any(record.Attributes)
sentry.CaptureEvent(event)
}
}
logger.Configure(
logger.WithRootLevel("INFO"),
logger.WithSinks(
logger.NewConsoleSink(logger.ConsoleAuto, nil, 1),
NewCallbackSink(forwardToSentry, int(logger.SeverityError)),
),
)
Шаг 3. Соблюдение неблокирующего контракта
По спеке §7.1 вызов emit не должен блокировать вызывающего logger.info(...). Конкретно:
- Сетевой вызов внутри
emitблокирует поток запроса приложения. Нельзя. - Ограниченная очередь плюс фоновый worker, вычитывающий её. Можно.
- Лок на длительность записи в локальный файл. Допустимо, когда файл на локальном диске и запись быстра (< 1 мс).
Для локальных ресурсов (Phase 1-приёмники) синхронный паттерн в порядке. Для удалённых — следуй паттерну async-worker в шаге 5.
Шаг 4. Обращение с маскированными атрибутами как с непрозрачными
К моменту вызова emit маскирование на атрибутах записи уже отработало (по спеке §10). Значения по ключам, оканчивающимся на _key, _secret и т.п., уже превратились в "***". Твой приёмник не должен пытаться восстановить исходное значение, логировать немаскированные атрибуты в другое место или прогонять значение через любой pattern matcher, способный его утечь.
Если приёмник пере-форматирует записи в vendor-специфичную форму (например, Sentry, Loki, Splunk), маскированные значения должны проходить без изменений. Гарантируй это в docstring приёмника, чтобы ревьюер видел сразу.
Шаг 5. Паттерн async-worker (набросок Phase 2)
Паттерн для удалённых приёмников (OTLP, Loki и др.):
# Набросок — полная реализация выйдет с OTLPSink в Phase 2.
import queue
import threading
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from dagstack.logger import LogRecord
class RemoteSink:
def __init__(self, endpoint: str, *, batch_size: int = 100, queue_size: int = 10_000):
self._endpoint = endpoint
self._batch_size = batch_size
self._queue: queue.Queue = queue.Queue(maxsize=queue_size)
self._dropped = 0
self._stop = threading.Event()
self._worker = threading.Thread(target=self._run, daemon=True)
self._worker.start()
self.id = f"remote:{endpoint}"
def emit(self, record: "LogRecord") -> None:
try:
self._queue.put_nowait(record)
except queue.Full:
# Отбрасываем самую старую — неблокирующий emit это контракт.
try:
self._queue.get_nowait()
except queue.Empty:
pass
try:
self._queue.put_nowait(record)
except queue.Full:
self._dropped += 1
def flush(self, timeout: float = 5.0) -> None:
self._queue.join_with_timeout(timeout) # кастомный хелпер, опущен
def close(self) -> None:
self._stop.set()
self._worker.join(timeout=5.0)
def supports_severity(self, severity_number: int) -> bool:
return True
def _run(self) -> None:
batch = []
while not self._stop.is_set():
try:
record = self._queue.get(timeout=1.0)
except queue.Empty:
continue
batch.append(record)
if len(batch) >= self._batch_size:
self._send(batch)
batch = []
if batch:
self._send(batch)
def _send(self, batch: list) -> None:
# POST на self._endpoint; при ошибке инкрементировать drop-счётчик.
...
Три инварианта для async-случая:
- Ограниченная очередь.
queue_sizeлимитирует число pending-записей; при переполнении отбрасывается самая старая (или, если так настроено, блокируется вызывающий). - Учёт отбрасываний. Веди
dropped_totalна каждый приёмник; выставляй наружу через метрики логгера (спека §12). - Daemon-worker. Поток worker'а — daemon, чтобы не мешать выходу из процесса.
close()join'ит поток с timeout.
Шаг 6. Тестирование приёмника
InMemorySink — каноническая референсная реализация синхронного приёмника — читай его исходник как чек-лист пограничных случаев (конкурентный emit + close, переполнение capacity, идемпотентный close). Твой тест-сьют должен покрывать как минимум:
- Базовый emit + извлечение. Приёмник сохраняет или передаёт записи как ожидается.
- Фильтрация по уровню. Записи ниже
min_severityпропускаются без побочных эффектов. - Идемпотентный close. Повторный
close()не выкидывает исключение. - Конкурентный emit + close. Приёмник не пишет в закрытый ресурс.
- Учёт drop'ов (только async-случай). Drop-счётчик увеличивается ровно на число отброшенных записей.
Спека §14.4 перечисляет conformance-тесты для Phase 1-приёмников; повтори их для своего приёмника.
См. также
- Приёмники — обзор протокола и эталонные реализации Phase 1.
- Форматы передачи — как сериализовать записи, если приёмник эмитит байты.
- Маскирование — что приёмник получает на вход.
- ADR-0001 §7 (полный нормативный текст).