Quick start
dagstack/logger is an OpenTelemetry-compatible structured logging contract for the dagstack ecosystem. It standardises:
- The wire format — a LogRecord that matches the OTel Log Data Model v1.24.
- Severity — the numeric range 1-24 with six canonical text values (
TRACE,DEBUG,INFO,WARN,ERROR,FATAL). - Sinks — pluggable destinations that share one Sink protocol (Phase 1 ships
ConsoleSink,FileSink,InMemorySink). - Context propagation —
trace_id,span_id, and W3C Baggage entries are injected into every record automatically. - Redaction — attribute values whose keys end in
_key,_secret,_token,_password,_passphrase, or_credentialsare masked at emit time. - Scoped overrides — a single agent run, test case, or audit endpoint can swap sinks for the duration of a block.
- AI-agent observability — an optional convention pack with OTel GenAI conformance for LLM, tool, and retrieval observability.
:::info Release status
All three bindings ship Phase 1 v0.1.x: dagstack-logger on PyPI, @dagstack/logger on npmjs.org, and go.dagstack.dev/logger (a vanity URL backed by github.com/dagstack/logger-go).
:::
Installation
- Python
- TypeScript
- Go
pip install dagstack-logger
npm install @dagstack/logger
go get go.dagstack.dev/logger
Your first log line
Bootstrap the global logger once at application startup, then call Logger.get(name) from anywhere in the codebase:
- Python
- TypeScript
- Go
from dagstack.logger import Logger, ConsoleSink, configure
configure(
root_level="INFO",
sinks=[ConsoleSink(mode="auto")],
resource_attributes={"service.name": "order-service"},
)
logger = Logger.get("order_service.api", version="1.0.0")
logger.info("request received", attributes={"request.id": "req-abc", "user.id": 42})
import { Logger, ConsoleSink, configure } from "@dagstack/logger";
configure({
rootLevel: "INFO",
sinks: [new ConsoleSink({ mode: "auto" })],
resourceAttributes: { "service.name": "order-service" },
});
const logger = Logger.get("order_service.api", "1.0.0");
logger.info("request received", { "request.id": "req-abc", "user.id": 42 });
import (
"go.dagstack.dev/logger"
)
logger.Configure(
logger.WithRootLevel("INFO"),
logger.WithSinks(logger.NewConsoleSink(logger.ConsoleAuto, nil, 1)),
logger.WithResourceAttributes(logger.Attrs{"service.name": "order-service"}),
)
log := logger.GetVersioned("order_service.api", "1.0.0")
log.Info("request received", logger.Attrs{"request.id": "req-abc", "user.id": 42})
ConsoleSink(mode="auto") chooses pretty coloured output when stderr is a TTY and JSON-lines otherwise (so containerised stdout capture, jq, and fluent-bit see structured records, while a developer's terminal stays readable).
Adding sinks
The configure() call accepts a list of sinks. Phase 1 ships three:
- Python
- TypeScript
- Go
from dagstack.logger import ConsoleSink, FileSink, InMemorySink, configure
configure(
root_level="INFO",
sinks=[
ConsoleSink(mode="json"),
FileSink("/var/log/order-service.jsonl", max_bytes=100_000_000, keep=10),
],
resource_attributes={
"service.name": "order-service",
"service.version": "1.0.0",
"deployment.environment": "production",
},
)
import { ConsoleSink, FileSink, InMemorySink, configure } from "@dagstack/logger";
configure({
rootLevel: "INFO",
sinks: [
new ConsoleSink({ mode: "json" }),
new FileSink("/var/log/order-service.jsonl", { maxBytes: 100_000_000, keep: 10 }),
],
resourceAttributes: {
"service.name": "order-service",
"service.version": "1.0.0",
"deployment.environment": "production",
},
});
fileSink, err := logger.NewFileSink("/var/log/order-service.jsonl", 100_000_000, 10, 1)
if err != nil {
// handle file open error
}
logger.Configure(
logger.WithRootLevel("INFO"),
logger.WithSinks(
logger.NewConsoleSink(logger.ConsoleJSON, nil, 1),
fileSink,
),
logger.WithResourceAttributes(logger.Attrs{
"service.name": "order-service",
"service.version": "1.0.0",
"deployment.environment": "production",
}),
)
FileSink writes Canonical JSON-lines and rotates by file size; InMemorySink is a ring buffer for tests. Each sink applies its own min_severity filter independently of the others (see Sinks).
Logging exceptions
Use the exception method to capture an active error with OTel exception.* attributes:
- Python
- TypeScript
- Go
try:
process_order(order_id)
except OrderValidationError as err:
logger.exception(err, attributes={"order.id": order_id})
try {
await processOrder(orderId);
} catch (err) {
logger.exception(err, { attributes: { "order.id": orderId } });
}
if err := processOrder(ctx, orderID); err != nil {
log.ExceptionCtx(ctx, err, nil, logger.Attrs{"order.id": orderID})
}
The record is emitted at ERROR severity. Attributes exception.type, exception.message, exception.stacktrace are populated automatically per the OTel exception.* semantic conventions.
Capturing logs in tests
Use InMemorySink plus a scoped override to capture only the records emitted inside a block:
- Python
- TypeScript
- Go
from dagstack.logger import InMemorySink, Logger
sink = InMemorySink(capacity=100)
logger = Logger.get("test_module")
with logger.scope_sinks([sink]):
run_business_logic()
records = sink.records()
assert any(r.body == "operation completed" for r in records)
import { InMemorySink, Logger } from "@dagstack/logger";
const sink = new InMemorySink({ capacity: 100 });
const logger = Logger.get("test_module");
await logger.scopeSinks([sink], async (scoped) => {
await runBusinessLogic();
});
const records = sink.records();
if (!records.some((r) => r.body === "operation completed")) {
throw new Error("expected record not captured");
}
sink := logger.NewInMemorySink(100, 1)
log := logger.Get("test_module")
err := log.ScopeSinks(ctx, []logger.Sink{sink}, func(ctx context.Context) error {
runBusinessLogic(ctx)
return nil
})
if err != nil {
// handle
}
records := sink.Records()
// assert any record's Body matches "operation completed"
The override only affects emits made through the scoped logger and its children; the global Logger.get(name) keeps writing to its configured sinks. See the Testing guide for full assertions.
Which applications fit
dagstack/logger is domain-agnostic. It works equally well for:
- Web and API services — request logging, latency events, error tracking.
- Data pipelines — job lifecycle, batch progress, throughput metrics.
- Workflow orchestrators — operation hierarchy, retry events, run audit.
- AI / RAG platforms — LLM call tracing, tool dispatch, token accounting (see the AI-agent extension pack).
- Notification systems — delivery attempts, provider responses, dead-letter handling.
- Billing / payment services — transaction events, redaction of card data, audit trails.
The mechanics are identical: define a logger name per module, configure sinks once, emit structured records, and let context propagation tie them to traces.
What to read next
Concepts — the model behind the logger:
- Severity, Sinks, Context propagation.
- Operations and typed events, Redaction, Scoped overrides.
- Wire formats, AI-agent observability.
Guides — how to solve typical tasks:
Reference — precise tables:
Specification — normative decisions:
API reference:
- Python — generated from the
dagstack-loggerpackage source. - TypeScript — generated from the
@dagstack/loggerpackage source. - Go — generated from the
go.dagstack.dev/loggerpackage source.