Skip to main content
Version: 0.4

Core Concepts

Six concepts cover everything CubePi does. Read this page once, then the rest of the docs become a lookup table.

Agentโ€‹

Agent is the stateful faรงade: you construct it with a provider, a model, optional tools, and optional middleware/checkpointer. You drive it through three methods:

  • await agent.prompt(message) โ€” start a new turn from a user message.
  • await agent.resume() โ€” continue from the last persisted message (used with a checkpointer).
  • agent.steer(message) / agent.follow_up(message) โ€” queue a message mid-flight or after the current run.

The agent owns an AgentState (system prompt, tools, model, message history, pending tool calls, streaming flag) and a list of subscribers:

unsubscribe = agent.subscribe(my_listener)
# ...
unsubscribe()

Subscribers receive every AgentEvent the loop emits. They can be sync or async.

Toolโ€‹

An AgentTool is a name + description + Pydantic parameter model + async execute function:

from pydantic import BaseModel
from cubepi import AgentTool, AgentToolResult, TextContent

class SearchParams(BaseModel):
query: str
limit: int = 10

async def execute(tool_call_id, params: SearchParams, *, signal=None, on_update=None):
# do work; respect `signal` if you can be aborted
return AgentToolResult(content=[TextContent(text=f"โ€ฆ")])

search = AgentTool(
name="search",
description="Search the corpus",
parameters=SearchParams,
execute=execute,
)

The Pydantic schema is auto-converted to JSON Schema and passed to the model. Arg parsing, error wrapping, and parallel execution are handled by the framework. See Tool Use for execution_mode, on_update (incremental progress), and terminate (end the turn from a tool).

Providerโ€‹

A Provider is anything matching this Protocol:

class Provider(Protocol):
async def stream(
self,
model: Model,
messages: list[Message],
*,
system_prompt: str = "",
tools: list[ToolDefinition] | None = None,
options: StreamOptions | None = None,
) -> MessageStream: ...

It returns a MessageStream โ€” a single async iterator that yields StreamEvents and exposes the final AssistantMessage via await stream.result(). Built-in providers:

  • AnthropicProvider โ€” Claude (Messages API, with thinking, caching, tool use).
  • OpenAIProvider โ€” GPT family (Chat Completions API).
  • OpenAIResponsesProvider โ€” GPT family (Responses API, server-side state).
  • FauxProvider โ€” deterministic test double (no network).

Write your own by implementing one method. See Providers / Custom.

Stream and eventsโ€‹

Streams and events are two layers:

  • Provider streams โ€” MessageStream yields provider events: start, text_start, text_delta, text_end, thinking_*, toolcall_*, done, error. This is the raw token stream.
  • Agent events โ€” what agent.subscribe(...) receives. Eleven types covering the entire loop: agent_start, agent_end, turn_start, turn_end, message_start, message_update, message_end, tool_execution_start, tool_execution_update, tool_execution_end. message_update wraps the provider event inside event.stream_event.

Subscribe to agent events for UI; for low-level token routing dig into event.stream_event. See Streaming Events.

Middlewareโ€‹

Middleware is a class with up to seven typed hooks:

HookWhen it runsComposition rule
transform_contextBefore each model call, on the message listChained โ€” each receives previous result
convert_to_llmRight before serialisation to the providerLast implementation wins
transform_system_promptBefore each model call, on the system promptChained
before_tool_callPer tool call, after arg validationFirst block=True short-circuits
after_tool_callPer tool call, after executeLater override earlier
after_model_responseAfter the assistant message landsReturns a TurnAction controlling flow
should_stop_after_turnAt each turn boundaryAny True stops

Pass middleware as a list to Agent(middleware=[...]). See Middleware โ†’ Composition.

Checkpointerโ€‹

A Checkpointer is anything matching:

class Checkpointer(Protocol):
async def load(self, thread_id: str) -> CheckpointData | None: ...
async def append(self, thread_id: str, messages: list[Message]) -> None: ...
async def save_extra(self, thread_id: str, extra: dict) -> None: ...

Bind one to an agent with Agent(checkpointer=cp, thread_id="โ€ฆ") and the loop will append each new message as it lands, restoring history on the first prompt(). Built-in backends: MemoryCheckpointer, SQLiteCheckpointer, PostgresCheckpointer. See Checkpointing โ†’ SQLite.

Tracer (optional)โ€‹

Tracer produces OpenTelemetry spans aligned with the GenAI Semantic Conventions, so any OTLP backend (Jaeger, Tempo, Honeycomb, Datadog, AWS X-Ray, โ€ฆ) can ingest agent runs without custom instrumentation. Install the extra:

pip install "cubepi[tracing]" # OTel SDK
pip install "cubepi[tracing-otlp]" # + OTLP/HTTP exporter

then wrap your agent in an async with:

from cubepi.tracing import Tracer
from cubepi.tracing.exporters import JsonlSpanExporter

async with (
Tracer(
service_name="my-bot",
agent_name="assistant",
exporters=[JsonlSpanExporter(directory="./cubepi-traces")],
) as tracer,
tracer.attached(agent),
):
await agent.prompt("โ€ฆ")

Each run emits an invoke_agent root span containing one cubepi.turn per LLM round-trip, plus chat (CLIENT) and execute_tool children. By default no prompt content or model output is recorded โ€” opt in with Tracer(record_content=True) and a redact callback for PII. Pair with Meter(...) for token / duration / TTFC histograms. Full guide: Tracing โ†’ Overview.

Putting it togetherโ€‹

User code
โ”‚
โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Agent โ”‚
โ”‚ โ”œโ”€ AgentState (messages, tools, โ€ฆ) โ”‚
โ”‚ โ”œโ”€ Middleware โ”€โ”€ compose_middleware() โ”‚
โ”‚ โ”œโ”€ Checkpointer โ”€โ”€ append on message_end โ”‚
โ”‚ โ””โ”€ run_agent_loop โ—€โ”€โ”€โ”€โ”€ the actual loop โ”‚
โ”‚ โ”‚ โ”‚
โ”‚ โ–ผ โ”‚
โ”‚ Provider.stream() โ†’ MessageStream โ”‚
โ”‚ โ”‚ โ”‚
โ”‚ โ””โ”€ events โ†’ emit โ†’ subscribers โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

That diagram is the whole framework. The rest of this site is just the details.