Core Concepts
Six concepts cover everything CubePi does. Read this page once, then the rest of the docs become a lookup table.
Agentโ
Agent is the stateful faรงade: you construct it with a provider, a
model, optional tools, and optional middleware/checkpointer. You drive
it through three methods:
await agent.prompt(message)โ start a new turn from a user message.await agent.resume()โ continue from the last persisted message (used with a checkpointer).agent.steer(message)/agent.follow_up(message)โ queue a message mid-flight or after the current run.
The agent owns an AgentState (system prompt, tools, model, message
history, pending tool calls, streaming flag) and a list of subscribers:
unsubscribe = agent.subscribe(my_listener)
# ...
unsubscribe()
Subscribers receive every AgentEvent the loop emits. They can be
sync or async.
Toolโ
An AgentTool is a name + description + Pydantic parameter model +
async execute function:
from pydantic import BaseModel
from cubepi import AgentTool, AgentToolResult, TextContent
class SearchParams(BaseModel):
query: str
limit: int = 10
async def execute(tool_call_id, params: SearchParams, *, signal=None, on_update=None):
# do work; respect `signal` if you can be aborted
return AgentToolResult(content=[TextContent(text=f"โฆ")])
search = AgentTool(
name="search",
description="Search the corpus",
parameters=SearchParams,
execute=execute,
)
The Pydantic schema is auto-converted to JSON Schema and passed to the
model. Arg parsing, error wrapping, and parallel execution are
handled by the framework. See Tool Use
for execution_mode, on_update (incremental progress), and
terminate (end the turn from a tool).
Providerโ
A Provider is anything matching this Protocol:
class Provider(Protocol):
async def stream(
self,
model: Model,
messages: list[Message],
*,
system_prompt: str = "",
tools: list[ToolDefinition] | None = None,
options: StreamOptions | None = None,
) -> MessageStream: ...
It returns a MessageStream โ a single async iterator that yields
StreamEvents and exposes the final AssistantMessage via await stream.result(). Built-in providers:
AnthropicProviderโ Claude (Messages API, with thinking, caching, tool use).OpenAIProviderโ GPT family (Chat Completions API).OpenAIResponsesProviderโ GPT family (Responses API, server-side state).FauxProviderโ deterministic test double (no network).
Write your own by implementing one method. See Providers / Custom.
Stream and eventsโ
Streams and events are two layers:
- Provider streams โ
MessageStreamyields provider events:start,text_start,text_delta,text_end,thinking_*,toolcall_*,done,error. This is the raw token stream. - Agent events โ what
agent.subscribe(...)receives. Eleven types covering the entire loop:agent_start,agent_end,turn_start,turn_end,message_start,message_update,message_end,tool_execution_start,tool_execution_update,tool_execution_end.message_updatewraps the provider event insideevent.stream_event.
Subscribe to agent events for UI; for low-level token routing dig into
event.stream_event. See Streaming Events.
Middlewareโ
Middleware is a class with up to seven typed hooks:
| Hook | When it runs | Composition rule |
|---|---|---|
transform_context | Before each model call, on the message list | Chained โ each receives previous result |
convert_to_llm | Right before serialisation to the provider | Last implementation wins |
transform_system_prompt | Before each model call, on the system prompt | Chained |
before_tool_call | Per tool call, after arg validation | First block=True short-circuits |
after_tool_call | Per tool call, after execute | Later override earlier |
after_model_response | After the assistant message lands | Returns a TurnAction controlling flow |
should_stop_after_turn | At each turn boundary | Any True stops |
Pass middleware as a list to Agent(middleware=[...]). See
Middleware โ Composition.
Checkpointerโ
A Checkpointer is anything matching:
class Checkpointer(Protocol):
async def load(self, thread_id: str) -> CheckpointData | None: ...
async def append(self, thread_id: str, messages: list[Message]) -> None: ...
async def save_extra(self, thread_id: str, extra: dict) -> None: ...
Bind one to an agent with Agent(checkpointer=cp, thread_id="โฆ") and
the loop will append each new message as it lands, restoring history
on the first prompt(). Built-in backends: MemoryCheckpointer,
SQLiteCheckpointer, PostgresCheckpointer. See
Checkpointing โ SQLite.
Putting it togetherโ
User code
โ
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Agent โ
โ โโ AgentState (messages, tools, โฆ) โ
โ โโ Middleware โโ compose_middleware() โ
โ โโ Checkpointer โโ append on message_end โ
โ โโ run_agent_loop โโโโโ the actual loop โ
โ โ โ
โ โผ โ
โ Provider.stream() โ MessageStream โ
โ โ โ
โ โโ events โ emit โ subscribers โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
That diagram is the whole framework. The rest of this site is just the details.