跳到主要内容
版本:0.5(最新)

cubepi.tracing

tracing_context

function

tracing_context(*, tags: list[str] | tuple[str, ...] | None = None, metadata: dict[str, Any] | None = None) -> Iterator[None]

Scope tags / metadata onto agent runs started inside this block.

The recorder reads these contextvars on AgentStartEvent and stamps them on the invoke_agent span as:

  • cubepi.tags — tuple of strings (OTel attribute type)
  • one attribute per metadata key, namespaced under cubepi.metadata.* (e.g. metadata=\{"user_id": "u-42"\}cubepi.metadata.user_id = "u-42"). The dedicated sub-namespace keeps recorder-owned schema keys (cubepi.run_id, cubepi.turn.index, …) safe from caller-supplied collisions.

The contextvar nature means this works for concurrent agents: each asyncio task tree gets its own value. Nested blocks merge additively (tags concatenate; metadata is union with inner keys winning).

Args

  • tags — Tags to apply to runs started in this scope. Stored as a tuple on the span so it round-trips through OTel's attribute serializer.
  • metadata — Per-run key/value pairs. Values must be types that OTel attributes accept (str, bool, int, float, or a tuple/list of those); other shapes will be silently dropped by the recorder.

source

JsonlSpanExporter

class

JsonlSpanExporter(self, directory: str | os.PathLike[str] = './cubepi-traces')

Write each ReadableSpan as one JSON line.

Files: <directory>/<YYYY-MM-DD>/<run_id>.jsonl. The date used for the subdirectory is the span's start time (UTC). The run_id comes from the cubepi.run_id attribute set by the cubepi Recorder; spans without that attribute fall back to "unknown-run".

Permissions: files are created mode 0o600 (user-only).

source

Methods

export

export(spans: Sequence[ReadableSpan]) -> SpanExportResult

source

shutdown

shutdown() -> None

source

force_flush

force_flush(timeout_millis: int = 30000) -> bool

source

Meter

class

Meter(self, *, resource: Resource | None = None, exporters: list[MetricExporter] | None = None, export_interval_millis: int = 60000)

Emit OTel GenAI histograms alongside the cubepi Tracer.

Construct with a list of MetricExporter instances (e.g. the OTLP metric exporter); call attach to start emitting.

Example:

from cubepi.tracing import Tracer, Meter
from opentelemetry.exporter.otlp.proto.http.metric_exporter import (
OTLPMetricExporter,
)

tracer = Tracer(service_name="my-bot", exporters=[...])
meter = Meter(
resource=tracer.resource,
exporters=[OTLPMetricExporter(endpoint="http://collector:4318/v1/metrics")],
)
tracer.attach(agent)
meter.attach(agent)

source

Attributes

  • otel_meter: Any — The underlying opentelemetry.metrics.Meter — exposed so callers can register their own instruments.

Methods

attach

attach(agent: 'Agent') -> Callable[[], None]

Subscribe to agent and start emitting metrics.

Each attach() call gets its own _MeterState so multiple concurrent agents attached to the same Meter don't clobber each other's open-ns timestamps and attribute dicts (codex overall-review MAJOR — previously all state lived on the Meter instance, so a second agent's provider_request overwrote the first agent's _chat_open_ns / _chat_attrs before its response landed).

source

force_flush

force_flush(timeout_seconds: float = 30.0) -> bool

source

shutdown

shutdown(timeout_seconds: float = 30.0) -> None

source

attached

attached(agent: 'Agent') -> AsyncIterator['Meter']

RAII wrapper around attach.

async with enters by attaching the per-attach state, exits by calling the detach callable returned from attach. Mirrors Tracer.attached — use both for the cleanest end-to-end shutdown:

async with (
Tracer(...) as tracer,
Meter(resource=tracer.resource, ...) as meter,
tracer.attached(agent),
meter.attached(agent),
):
await agent.prompt("...")
# auto: detach both + shutdown both

source

SCHEMA_URL

attribute

SCHEMA_URL = 'https://opentelemetry.io/schemas/1.41.0'

source

Tracer

class

Tracer(self, *, service_name: str | None = None, service_version: str | None = None, service_namespace: str | None = None, service_instance_id: str | None = None, deployment_environment: str | None = None, agent_name: str | None = None, agent_id: str | None = None, agent_description: str | None = None, agent_version: str | None = None, exporters: list[SpanExporter] | None = None, record_content: bool = False, redact: 'Callable[[str, Any], Any] | None' = None, resource: Resource | None = None, atexit_flush: bool = True, atexit_flush_timeout_seconds: float = 5.0)

Attaches OTel-compatible tracing to a cubepi Agent.

Construct once per process (or per service). Each attach(agent) call wires the cubepi recorder to the agent's event stream and provider listener registry; the returned callable detaches.

Example

from cubepi.tracing import Tracer
from cubepi.tracing.exporters import JsonlSpanExporter

tracer = Tracer(
service_name="my-bot",
agent_name="coding-agent",
exporters=[JsonlSpanExporter(directory="./cubepi-traces")],
)
detach = tracer.attach(agent)
try:
...
finally:
detach()
await tracer.shutdown()

source

Attributes

  • resource: Resource
  • otel_tracer: Any — The underlying opentelemetry.trace.Tracer instance.
  • redact: 'Callable[[str, Any], Any] | None' — Optional (key, value) -> value filter applied at every set_attribute site for content attributes.

Methods

attach

attach(agent: 'Agent') -> Callable[[], Any]

Wire the cubepi recorder to agent.

Returns a detach() callable. When invoked:

  • Synchronously: unsubscribes every hook, closes any spans still open from a cancelled run, sweeps MCP tool-span registrations — observable on the next line.
  • Schedules a flush on the running event loop and returns the resulting asyncio.Task so callers can await detach() to block until buffered spans have been exported. Outside an async context returns None — call await tracer.shutdown() separately to flush.

Either await detach() or await tracer.shutdown() (or both) must be used in the caller's finally block; the synchronous detach() alone does not guarantee that ended spans have left BatchSpanProcessor (codex overall-review MAJOR).

Also registers this Tracer's private TracerProvider with cubepi.mcp._tracing so MCP CLIENT spans flow through the same exporters — without this step, the MCP module falls back to the OTel global default (a no-op provider when the caller didn't separately call trace.set_tracer_provider). The detach callable unregisters.

source

force_flush

force_flush(timeout_seconds: float = 30.0) -> bool

Block until all currently buffered spans are exported.

Returns False on timeout.

source

shutdown

shutdown(timeout_seconds: float = 30.0) -> None

Flush and close all exporters. Tracer is unusable after this.

source

attached

attached(agent: 'Agent') -> AsyncIterator['Tracer']

RAII wrapper around attach.

async with enters by attaching the recorder, exits by running detach and (in an async context) awaiting its returned flush Task. Equivalent to:

detach = tracer.attach(agent)
try:
...
finally:
result = detach()
if result is not None: # async context: it's a Task
await result

Use this instead of the bare attach / finally: detach() pattern when you want the cleanup tied to a single async with block. Combines with Tracer's own context manager:

async with Tracer(...) as tracer, tracer.attached(agent):
await agent.prompt("...")
# auto: detach (closes cancelled spans) + shutdown (flush + close)

source

trace

function

trace(tracer: 'Tracer | None', agent: 'Agent') -> AsyncIterator[None]

Best-effort tracing scope for one agent run.

Attaches tracer to agent on enter and detaches + flushes its buffered spans on exit. Every tracing fault — a failed attach, detach, or flush — is logged and swallowed, so tracing can never break or fail the work inside the async with block. Passing tracer=None makes the block a no-op, which lets callers gate tracing on config without branching at the call site.

This does not shut the tracer down: the tracer is reusable across runs, so build it once (e.g. per process) and call await tracer.shutdown() when the owning process stops.

Unlike Tracer.attached, which surfaces flush failures to the caller, this helper swallows them — use it when tracing is auxiliary to the work and must never affect its outcome.

Example

async with trace(tracer, agent):
await agent.prompt("...")

source