Sinks
A sink is a destination for LogRecords. The logger fans every emitted record out to every attached sink that passes its min_severity filter. Sinks are independent — a failure in one does not affect the others.
The Sink protocol
Every sink implements four methods:
Sink {
id: string # diagnostic identifier (e.g., "console:auto", "file:/var/log/app.jsonl")
emit(record): void # non-blocking; queues the record for delivery
flush(timeout): void # block until buffered records are delivered or the timeout fires
close(): void # flush + release resources
supports_severity(severity_number): bool # filter hint; rejected severities are skipped early
}
The emit method does not block the caller. Blocking I/O (network exporters, file flush) goes through an internal buffer plus a worker; if the buffer overflows, the sink drops the oldest record by default and increments a drop counter. The buffer / worker is sink-specific — the Phase 1 ConsoleSink and FileSink synchronise stdout / file writes under a lock, while the planned OTLPSink (Phase 2) uses a background worker.
Phase 1 sinks
Three sinks ship in the first release of every binding:
| Sink | Where records go | Default mode | Configuration |
|---|---|---|---|
ConsoleSink | sys.stderr (default) or any text stream | auto (TTY → pretty, non-TTY → JSON) | mode, stream, min_severity |
FileSink | A file on disk, JSON-lines | JSON-lines with size-based rotation | path, max_bytes, keep, min_severity |
InMemorySink | An in-process ring buffer | dropped on overflow | capacity, min_severity |
ConsoleSink
ConsoleSink writes records to sys.stderr (or a chosen text stream) as either coloured pretty text or compact JSON-lines. The default mode="auto" switches based on TTY detection — an interactive terminal gets the pretty mode; piped or redirected output gets JSON.
- Python
- TypeScript
- Go
from dagstack.logger import ConsoleSink
# Auto mode: pretty on a TTY, JSON otherwise.
sink = ConsoleSink(mode="auto")
# Force JSON for container logs.
sink = ConsoleSink(mode="json", min_severity=9)
# Force pretty for a debug terminal.
sink = ConsoleSink(mode="pretty")
import { ConsoleSink } from "@dagstack/logger";
// Auto mode: pretty on a TTY, JSON otherwise.
let sink = new ConsoleSink({ mode: "auto" });
// Force JSON for container logs.
sink = new ConsoleSink({ mode: "json", minSeverity: 9 });
// Force pretty for a debug terminal.
sink = new ConsoleSink({ mode: "pretty" });
import (
"os"
"go.dagstack.dev/logger"
)
// Auto mode: pretty on a TTY, JSON otherwise. Pass nil for stream → os.Stderr.
sink := logger.NewConsoleSink(logger.ConsoleAuto, nil, 1)
// Force JSON for container logs.
sink = logger.NewConsoleSink(logger.ConsoleJSON, os.Stdout, int(logger.SeverityInfo))
// Force pretty for a debug terminal.
sink = logger.NewConsoleSink(logger.ConsolePretty, nil, 1)
FileSink
FileSink writes Canonical JSON-lines to a file, rotating by file size. The Python binding uses stdlib RotatingFileHandler under the hood — a battle-tested rotation implementation — and applies the dagstack JSON-lines format on top.
- Python
- TypeScript
- Go
from dagstack.logger import FileSink
sink = FileSink(
"/var/log/order-service.jsonl",
max_bytes=100_000_000, # rotate at 100 MB
keep=10, # keep 10 archived files
min_severity=9, # INFO and above
)
import { FileSink } from "@dagstack/logger";
const sink = new FileSink("/var/log/order-service.jsonl", {
maxBytes: 100_000_000, // rotate at 100 MB
keep: 10, // keep 10 archived files
minSeverity: 9, // INFO and above
});
sink, err := logger.NewFileSink(
"/var/log/order-service.jsonl",
100_000_000, // maxBytes — rotate at 100 MB
10, // keep — keep 10 archived files
int(logger.SeverityInfo), // minSeverity — INFO and above
)
if err != nil {
// failed to open file
}
max_bytes=0 disables rotation (the file grows until the operator truncates or moves it). keep=0 deletes archived files immediately on rotation — only the live file remains.
InMemorySink
InMemorySink accumulates records in a bounded ring buffer; the oldest records are dropped when the buffer is full. Designed for tests and for short-lived diagnostic captures.
- Python
- TypeScript
- Go
from dagstack.logger import InMemorySink
sink = InMemorySink(capacity=100)
# ... emit some records ...
records = sink.records() # snapshot copy
assert any(r.body == "expected message" for r in records)
sink.clear() # reset for the next test
import { InMemorySink } from "@dagstack/logger";
const sink = new InMemorySink({ capacity: 100 });
// ... emit some records ...
const records = sink.records(); // snapshot copy
if (!records.some((r) => r.body === "expected message")) {
throw new Error("missing expected record");
}
sink.clear(); // reset for the next test
sink := logger.NewInMemorySink(100, 1) // capacity=100, minSeverity=1
// ... emit some records ...
records := sink.Records() // snapshot copy
// assertions on records[i].Body, records[i].Attributes, ...
sink.Clear() // reset for the next test
InMemorySink does not implement wire serialisation — it stores LogRecord objects directly so test assertions can inspect typed body, attributes, and severity_number without parsing JSON.
Multi-sink routing
A logger can be configured with multiple sinks. Each sink applies its own min_severity filter independently of the others. A common production setup writes warnings and above to the console, all INFO+ to a file for forensics, and errors only to a remote sink.
- Python
- TypeScript
- Go
from dagstack.logger import ConsoleSink, FileSink, configure
configure(
root_level="DEBUG",
sinks=[
ConsoleSink(mode="pretty", min_severity=13), # WARN+ on the console
FileSink("/var/log/app.jsonl", max_bytes=100_000_000, keep=10, min_severity=9),
],
)
import { ConsoleSink, FileSink, configure } from "@dagstack/logger";
configure({
rootLevel: "DEBUG",
sinks: [
new ConsoleSink({ mode: "pretty", minSeverity: 13 }), // WARN+ on the console
new FileSink("/var/log/app.jsonl", { maxBytes: 100_000_000, keep: 10, minSeverity: 9 }),
],
});
fileSink, _ := logger.NewFileSink("/var/log/app.jsonl", 100_000_000, 10, int(logger.SeverityInfo))
logger.Configure(
logger.WithRootLevel("DEBUG"),
logger.WithSinks(
logger.NewConsoleSink(logger.ConsolePretty, nil, int(logger.SeverityWarn)), // WARN+ on the console
fileSink,
),
)
Roadmap
The Phase 1 set ships in every binding. Phase 2 adds the OTel exporter and a few common production sinks; Phase 3 adds cloud and high-throughput integrations.
| Sink | Phase | Notes |
|---|---|---|
OTLPSink | 2 | OTLP/gRPC or OTLP/HTTP — the primary production exporter. |
SyslogSink | 2 | BSD syslog and RFC 5424 transports. |
SentrySink | 2 | ERROR and above only; body and attrs become a Sentry event. |
LokiSink | 2 | For deployments without an OTel collector. |
FluentBitForwardSink | 2 | Fluent Bit Forward protocol — common sidecar pattern. |
CloudWatchLogsSink | 3 | AWS CloudWatch Logs with batch put. |
GCPCloudLoggingSink | 3 | Google Cloud Logging with OAuth / service account. |
KafkaSink | 3 | High-throughput pipelines. |
ElasticsearchSink | 3 | Elasticsearch bulk API. |
See also
- Wire formats — what bytes each sink emits.
- Configure the logger — full bootstrap walkthrough.
- Implement a custom sink — the Sink protocol from the implementer's side.
- ADR-0001 §7 (full normative text).