Skip to main content

Python API — overview

The dagstack-logger package implements the logger contract for Python (3.11+). Phase 1 (currently published as v0.1.x on PyPI) ships the core API plus three sinks; Phase 2 will add the OTLP exporter and a LogProcessor chain.

Public exports

The package's __all__ exposes:

SymbolKindModule
Loggerclassdagstack.logger.logger
LogRecorddataclassdagstack.logger.records
Resourcedataclassdagstack.logger.records
InstrumentationScopedataclassdagstack.logger.records
SeverityIntEnumdagstack.logger.severity
SinkProtocoldagstack.logger.sinks
ConsoleSinkclassdagstack.logger.sinks
FileSinkclassdagstack.logger.sinks
InMemorySinkclassdagstack.logger.sinks
Subscriptiondataclassdagstack.logger.subscription
configurefunctiondagstack.logger.configuration
__version__strdagstack.logger._version

Import everything from the top level:

from dagstack.logger import (
Logger,
LogRecord,
Resource,
InstrumentationScope,
Severity,
Sink,
ConsoleSink,
FileSink,
InMemorySink,
Subscription,
configure,
)

configure(...) — global bootstrap

def configure(
*,
root_level: str | int = "INFO",
sinks: list[Sink] | None = None,
per_logger_levels: dict[str, str | int] | None = None,
resource_attributes: dict[str, Value] | None = None,
) -> None

Bootstrap the global logger state. Sets the root logger's severity floor, attaches sinks, applies per-logger overrides, and seeds the OTel Resource with process-level attributes. Call once at application startup, before any business code calls Logger.get(name).

See the Configure the logger guide for a full walkthrough.

Logger — named logger registry

class Logger:
@classmethod
def get(cls, name: str = "", version: str | None = None) -> Logger: ...

# Severity emits
def trace(self, body: Value, *, attributes: Mapping[str, Value] | None = None) -> None: ...
def debug(self, body: Value, *, attributes: Mapping[str, Value] | None = None) -> None: ...
def info(self, body: Value, *, attributes: Mapping[str, Value] | None = None) -> None: ...
def warn(self, body: Value, *, attributes: Mapping[str, Value] | None = None) -> None: ...
def error(self, body: Value, *, attributes: Mapping[str, Value] | None = None) -> None: ...
def fatal(self, body: Value, *, attributes: Mapping[str, Value] | None = None) -> None: ...
def log(self, severity_number: int, body: Value, *, attributes: Mapping[str, Value] | None = None) -> None: ...
def exception(self, err: BaseException, *, body: Value | None = None, attributes: Mapping[str, Value] | None = None) -> None: ...

# Configuration mutators
def set_sinks(self, sinks: list[Sink]) -> None: ...
def set_min_severity(self, severity_number: int) -> None: ...
def set_resource(self, resource: Resource | None) -> None: ...

# Scoped overrides
def with_sinks(self, sinks: list[Sink]) -> Logger: ...
def append_sinks(self, extra_sinks: list[Sink]) -> Logger: ...
def without_sinks(self) -> Logger: ...
def scope_sinks(self, sinks: list[Sink]) -> AbstractContextManager[Logger]: ...
def child(self, attributes: Mapping[str, Value]) -> Logger: ...

# Lifecycle
def flush(self, timeout: float = 5.0) -> None: ...
def close(self) -> None: ...

# Introspection
def effective_sinks(self) -> list[Sink]: ...
def effective_min_severity(self) -> int: ...
def effective_resource(self) -> Resource | None: ...

# Subscription (Phase 1 — inactive)
def on_reconfigure(self, callback: Callable[[], None]) -> Subscription: ...

Constructed only via Logger.get(name, version); the registry caches one instance per name. Children created by child(...), with_sinks(...), append_sinks(...), without_sinks(...) are detached (non-cached) — they inherit configuration from their parent but do not participate in the global registry.

Severity — bucket constants

class Severity(IntEnum):
TRACE = 1
DEBUG = 5
INFO = 9
WARN = 13
ERROR = 17
FATAL = 21

The six bucket-baseline values. int(Severity.INFO) == 9. See the Severity reference table for the full 1-24 enumeration.

Sinks

Three concrete sinks plus the Sink protocol:

class Sink(Protocol):
id: str
def emit(self, record: LogRecord) -> None: ...
def flush(self, timeout: float = 5.0) -> None: ...
def close(self) -> None: ...
def supports_severity(self, severity_number: int) -> bool: ...


class ConsoleSink:
def __init__(self, *, mode: Literal["auto", "json", "pretty"] = "auto",
stream: TextIO | None = None,
min_severity: int = 1) -> None: ...


class FileSink:
def __init__(self, path: str | Path, *,
max_bytes: int = 0, keep: int = 0,
min_severity: int = 1) -> None: ...


class InMemorySink:
def __init__(self, *, capacity: int = 1000, min_severity: int = 1) -> None: ...
def records(self) -> list[LogRecord]: ...
def clear(self) -> None: ...
@property
def capacity(self) -> int: ...

See Sinks for usage and the custom-sink guide for protocol implementation.

LogRecord — the typed dataclass

@dataclass(slots=True)
class LogRecord:
time_unix_nano: int
severity_number: int
severity_text: str
body: Value
attributes: dict[str, Value] = field(default_factory=dict)
instrumentation_scope: InstrumentationScope | None = None
resource: Resource | None = None
trace_id: bytes | None = None
span_id: bytes | None = None
trace_flags: int = 0
observed_time_unix_nano: int | None = None

Per spec §1, the field set matches the OTel Log Data Model v1.24. See LogRecord fields for the full table with semantics and ownership.

Subscription — Phase 1 inactive

Subscription mirrors the config-spec Subscription dataclass — path, active, inactive_reason. In Phase 1 the logger does not subscribe to runtime config changes; Logger.on_reconfigure(callback) returns an inactive subscription with inactive_reason="Phase 1 logger does not support watch-based reconfigure" and emits a one-time warning. Phase 2 will activate the watch path.

See also