Implement a custom sink
A sink is any object that implements the four methods of the Sink protocol: emit, flush, close, supports_severity, plus an id attribute. Bindings use structural typing (Python Protocol, TypeScript interface, Go interface) — there is no base class to inherit from. Conformance is on shape, not class identity.
Step 1. Pick the right pattern
Two common categories:
- Synchronous sink — writes to a local resource (file, stdout, in-memory). The
emitcall performs the I/O directly under a lock;flushandclosewrap the resource lifecycle. The Phase 1 sinks (ConsoleSink,FileSink,InMemorySink) follow this pattern. - Async sink with worker — writes to a remote endpoint (HTTP, gRPC, network socket). The
emitcall enqueues the record into an in-memory buffer; a background worker drains the buffer in batches.flush(timeout)blocks until the buffer is empty or the timeout expires. The plannedOTLPSink(Phase 2) follows this pattern.
This guide covers the synchronous case first because it has fewer moving parts. The async-worker pattern is sketched at the end.
Step 2. Implement the protocol
A minimal sink that forwards records to a callable (useful for piping into Sentry, structlog, or a custom format):
- Python
- TypeScript
- Go
import threading
from typing import Callable
from dagstack.logger import LogRecord, Sink
class CallbackSink:
"""Forward each LogRecord to a user-supplied callable."""
def __init__(
self,
callback: Callable[[LogRecord], None],
*,
min_severity: int = 1,
) -> None:
self._callback = callback
self._min_severity = min_severity
self._lock = threading.Lock()
self._closed = False
self.id = f"callback:{callback.__name__}"
def emit(self, record: LogRecord) -> None:
if self._closed:
return
if not self.supports_severity(record.severity_number):
return
with self._lock:
if self._closed:
return
self._callback(record)
def flush(self, timeout: float = 5.0) -> None:
# Synchronous — nothing buffered.
return
def close(self) -> None:
with self._lock:
self._closed = True
def supports_severity(self, severity_number: int) -> bool:
return severity_number >= self._min_severity
import type { FlushResult, LogRecord, Sink } from "@dagstack/logger";
type RecordCallback = (record: LogRecord) => void;
/** Forward each LogRecord to a user-supplied callback. */
export class CallbackSink implements Sink {
public readonly id: string;
private readonly callback: RecordCallback;
private readonly minSeverity: number;
private closed = false;
constructor(callback: RecordCallback, options: { minSeverity?: number } = {}) {
this.callback = callback;
this.minSeverity = options.minSeverity ?? 1;
this.id = `callback:${callback.name || "anonymous"}`;
}
emit(record: LogRecord): void {
if (this.closed) return;
if (!this.supportsSeverity(record.severity_number)) return;
this.callback(record);
}
async flush(_timeoutMs?: number): Promise<FlushResult> {
return { ok: true };
}
async close(): Promise<void> {
this.closed = true;
}
supportsSeverity(severityNumber: number): boolean {
return severityNumber >= this.minSeverity;
}
}
package mysinks
import (
"fmt"
"sync"
"go.dagstack.dev/logger"
)
// CallbackSink forwards each LogRecord to a user-supplied callback.
type CallbackSink struct {
callback func(*logger.LogRecord)
minSeverity int
id string
mu sync.Mutex
closed bool
}
func NewCallbackSink(callback func(*logger.LogRecord), minSeverity int) *CallbackSink {
return &CallbackSink{
callback: callback,
minSeverity: minSeverity,
id: fmt.Sprintf("callback:%p", callback),
}
}
func (s *CallbackSink) ID() string { return s.id }
func (s *CallbackSink) SupportsSeverity(severityNumber int) bool {
return severityNumber >= s.minSeverity
}
func (s *CallbackSink) Emit(record *logger.LogRecord) {
if record == nil {
return
}
if !s.SupportsSeverity(record.SeverityNumber) {
return
}
s.mu.Lock()
if s.closed {
s.mu.Unlock()
return
}
s.mu.Unlock()
s.callback(record)
}
func (s *CallbackSink) Flush(_ float64) error { return nil }
func (s *CallbackSink) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
s.closed = true
return nil
}
Wire it up alongside the built-in sinks:
- Python
- TypeScript
- Go
import sentry_sdk
from dagstack.logger import ConsoleSink, configure
def forward_to_sentry(record):
if record.severity_number >= 17: # ERROR and above
sentry_sdk.capture_message(
str(record.body),
level="error",
extras=record.attributes,
)
configure(
root_level="INFO",
sinks=[
ConsoleSink(mode="auto"),
CallbackSink(forward_to_sentry, min_severity=17),
],
)
import * as Sentry from "@sentry/node";
import { ConsoleSink, configure, type LogRecord } from "@dagstack/logger";
import { CallbackSink } from "./callback-sink";
function forwardToSentry(record: LogRecord): void {
if (record.severity_number >= 17) { // ERROR and above
Sentry.captureMessage(String(record.body), {
level: "error",
extra: { ...record.attributes },
});
}
}
configure({
rootLevel: "INFO",
sinks: [
new ConsoleSink({ mode: "auto" }),
new CallbackSink(forwardToSentry, { minSeverity: 17 }),
],
});
import (
"github.com/getsentry/sentry-go"
"go.dagstack.dev/logger"
)
func forwardToSentry(record *logger.LogRecord) {
if record.SeverityNumber >= int(logger.SeverityError) {
event := sentry.NewEvent()
event.Level = sentry.LevelError
event.Message = fmt.Sprint(record.Body)
event.Extra = map[string]any(record.Attributes)
sentry.CaptureEvent(event)
}
}
logger.Configure(
logger.WithRootLevel("INFO"),
logger.WithSinks(
logger.NewConsoleSink(logger.ConsoleAuto, nil, 1),
NewCallbackSink(forwardToSentry, int(logger.SeverityError)),
),
)
Step 3. Honour the non-blocking contract
Per spec §7.1 the emit call must not block the caller of logger.info(...). Concretely:
- A network call inside
emitblocks the application's request thread. Wrong. - A bounded queue plus a background worker that drains it. Right.
- A lock taken for the duration of a local file write. Acceptable when the file is on local disk and writes are fast (< 1 ms).
For local resources (Phase 1 sinks), the synchronous pattern is fine. For remote resources, follow the async-worker pattern in Step 5.
Step 4. Treat redacted attributes as opaque
By the time emit is called, redaction has already run on the record's attributes (per spec §10). Values for keys ending in _key, _secret, etc. are already "***". Your sink must not try to recover the original value, log unmasked attributes elsewhere, or re-process the value through any pattern matcher that could leak it.
If your sink re-formats records into a vendor-specific shape (e.g., Sentry, Loki, Splunk), pass the masked values through unchanged. Document this guarantee in the sink's docstring so reviewers can verify at a glance.
Step 5. Async-worker pattern (Phase 2 sketch)
The pattern for remote sinks (OTLP, Loki, etc.):
# Sketch — full implementation lands with OTLPSink in Phase 2.
import queue
import threading
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from dagstack.logger import LogRecord
class RemoteSink:
def __init__(self, endpoint: str, *, batch_size: int = 100, queue_size: int = 10_000):
self._endpoint = endpoint
self._batch_size = batch_size
self._queue: queue.Queue = queue.Queue(maxsize=queue_size)
self._dropped = 0
self._stop = threading.Event()
self._worker = threading.Thread(target=self._run, daemon=True)
self._worker.start()
self.id = f"remote:{endpoint}"
def emit(self, record: "LogRecord") -> None:
try:
self._queue.put_nowait(record)
except queue.Full:
# Drop the oldest — non-blocking emit is the contract.
try:
self._queue.get_nowait()
except queue.Empty:
pass
try:
self._queue.put_nowait(record)
except queue.Full:
self._dropped += 1
def flush(self, timeout: float = 5.0) -> None:
self._queue.join_with_timeout(timeout) # custom helper, omitted
def close(self) -> None:
self._stop.set()
self._worker.join(timeout=5.0)
def supports_severity(self, severity_number: int) -> bool:
return True
def _run(self) -> None:
batch = []
while not self._stop.is_set():
try:
record = self._queue.get(timeout=1.0)
except queue.Empty:
continue
batch.append(record)
if len(batch) >= self._batch_size:
self._send(batch)
batch = []
if batch:
self._send(batch)
def _send(self, batch: list) -> None:
# POST to self._endpoint; on failure, increment a drop counter.
...
Three invariants for the async case:
- Bounded queue.
queue_sizecaps the number of pending records; overflow drops the oldest (or, if configured, blocks the caller). - Drop accounting. Track
dropped_totalper sink; expose it through the logger's metrics surface (spec §12). - Daemon worker. The worker thread is a daemon so it does not prevent the process from exiting.
close()joins the thread with a timeout.
Step 6. Test the sink
InMemorySink is the canonical reference for a synchronous sink — read its source for a checklist of edge cases (concurrent emit + close, capacity overflow, idempotent close). Your test suite should cover at least:
- Basic emit + retrieval. The sink stores or forwards records as expected.
- Severity filtering. Records below
min_severityare skipped without side effects. - Idempotent close. Calling
close()twice does not raise. - Concurrent emit + close. The sink does not write to a closed resource.
- Drop accounting (async case only). The drop counter increments by exactly the number of dropped records.
Spec §14.4 lists the conformance tests for the Phase 1 sinks; mirror them for your custom sink.
See also
- Sinks — protocol overview and Phase 1 reference implementations.
- Wire formats — how to serialise records if your sink emits bytes.
- Redaction — what your sink receives as input.
- ADR-0001 §7 (full normative text).