Skip to main content

Implement a custom sink

A sink is any object that implements the four methods of the Sink protocol: emit, flush, close, supports_severity, plus an id attribute. Bindings use structural typing (Python Protocol, TypeScript interface, Go interface) — there is no base class to inherit from. Conformance is on shape, not class identity.

Step 1. Pick the right pattern

Two common categories:

  • Synchronous sink — writes to a local resource (file, stdout, in-memory). The emit call performs the I/O directly under a lock; flush and close wrap the resource lifecycle. The Phase 1 sinks (ConsoleSink, FileSink, InMemorySink) follow this pattern.
  • Async sink with worker — writes to a remote endpoint (HTTP, gRPC, network socket). The emit call enqueues the record into an in-memory buffer; a background worker drains the buffer in batches. flush(timeout) blocks until the buffer is empty or the timeout expires. The planned OTLPSink (Phase 2) follows this pattern.

This guide covers the synchronous case first because it has fewer moving parts. The async-worker pattern is sketched at the end.

Step 2. Implement the protocol

A minimal sink that forwards records to a callable (useful for piping into Sentry, structlog, or a custom format):

import threading
from typing import Callable

from dagstack.logger import LogRecord, Sink


class CallbackSink:
"""Forward each LogRecord to a user-supplied callable."""

def __init__(
self,
callback: Callable[[LogRecord], None],
*,
min_severity: int = 1,
) -> None:
self._callback = callback
self._min_severity = min_severity
self._lock = threading.Lock()
self._closed = False
self.id = f"callback:{callback.__name__}"

def emit(self, record: LogRecord) -> None:
if self._closed:
return
if not self.supports_severity(record.severity_number):
return
with self._lock:
if self._closed:
return
self._callback(record)

def flush(self, timeout: float = 5.0) -> None:
# Synchronous — nothing buffered.
return

def close(self) -> None:
with self._lock:
self._closed = True

def supports_severity(self, severity_number: int) -> bool:
return severity_number >= self._min_severity

Wire it up alongside the built-in sinks:

import sentry_sdk
from dagstack.logger import ConsoleSink, configure


def forward_to_sentry(record):
if record.severity_number >= 17: # ERROR and above
sentry_sdk.capture_message(
str(record.body),
level="error",
extras=record.attributes,
)


configure(
root_level="INFO",
sinks=[
ConsoleSink(mode="auto"),
CallbackSink(forward_to_sentry, min_severity=17),
],
)

Step 3. Honour the non-blocking contract

Per spec §7.1 the emit call must not block the caller of logger.info(...). Concretely:

  • A network call inside emit blocks the application's request thread. Wrong.
  • A bounded queue plus a background worker that drains it. Right.
  • A lock taken for the duration of a local file write. Acceptable when the file is on local disk and writes are fast (< 1 ms).

For local resources (Phase 1 sinks), the synchronous pattern is fine. For remote resources, follow the async-worker pattern in Step 5.

Step 4. Treat redacted attributes as opaque

By the time emit is called, redaction has already run on the record's attributes (per spec §10). Values for keys ending in _key, _secret, etc. are already "***". Your sink must not try to recover the original value, log unmasked attributes elsewhere, or re-process the value through any pattern matcher that could leak it.

If your sink re-formats records into a vendor-specific shape (e.g., Sentry, Loki, Splunk), pass the masked values through unchanged. Document this guarantee in the sink's docstring so reviewers can verify at a glance.

Step 5. Async-worker pattern (Phase 2 sketch)

The pattern for remote sinks (OTLP, Loki, etc.):

# Sketch — full implementation lands with OTLPSink in Phase 2.

import queue
import threading
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from dagstack.logger import LogRecord


class RemoteSink:
def __init__(self, endpoint: str, *, batch_size: int = 100, queue_size: int = 10_000):
self._endpoint = endpoint
self._batch_size = batch_size
self._queue: queue.Queue = queue.Queue(maxsize=queue_size)
self._dropped = 0
self._stop = threading.Event()
self._worker = threading.Thread(target=self._run, daemon=True)
self._worker.start()
self.id = f"remote:{endpoint}"

def emit(self, record: "LogRecord") -> None:
try:
self._queue.put_nowait(record)
except queue.Full:
# Drop the oldest — non-blocking emit is the contract.
try:
self._queue.get_nowait()
except queue.Empty:
pass
try:
self._queue.put_nowait(record)
except queue.Full:
self._dropped += 1

def flush(self, timeout: float = 5.0) -> None:
self._queue.join_with_timeout(timeout) # custom helper, omitted

def close(self) -> None:
self._stop.set()
self._worker.join(timeout=5.0)

def supports_severity(self, severity_number: int) -> bool:
return True

def _run(self) -> None:
batch = []
while not self._stop.is_set():
try:
record = self._queue.get(timeout=1.0)
except queue.Empty:
continue
batch.append(record)
if len(batch) >= self._batch_size:
self._send(batch)
batch = []
if batch:
self._send(batch)

def _send(self, batch: list) -> None:
# POST to self._endpoint; on failure, increment a drop counter.
...

Three invariants for the async case:

  1. Bounded queue. queue_size caps the number of pending records; overflow drops the oldest (or, if configured, blocks the caller).
  2. Drop accounting. Track dropped_total per sink; expose it through the logger's metrics surface (spec §12).
  3. Daemon worker. The worker thread is a daemon so it does not prevent the process from exiting. close() joins the thread with a timeout.

Step 6. Test the sink

InMemorySink is the canonical reference for a synchronous sink — read its source for a checklist of edge cases (concurrent emit + close, capacity overflow, idempotent close). Your test suite should cover at least:

  • Basic emit + retrieval. The sink stores or forwards records as expected.
  • Severity filtering. Records below min_severity are skipped without side effects.
  • Idempotent close. Calling close() twice does not raise.
  • Concurrent emit + close. The sink does not write to a closed resource.
  • Drop accounting (async case only). The drop counter increments by exactly the number of dropped records.

Spec §14.4 lists the conformance tests for the Phase 1 sinks; mirror them for your custom sink.

See also