Building Your First Agent
This guide is the longer cousin of the Quick Start. We'll build a single-tool agent end-to-end, then layer on the things you'll want next: streaming UI, error handling, and a cancel button.
Step 1 โ set up the provider and modelโ
The provider is the connection to the LLM API; the Model describes
which model you want to invoke and a few capabilities.
import os
from cubepi import Model
from cubepi.providers.anthropic import AnthropicProvider
provider = AnthropicProvider(api_key=os.environ["ANTHROPIC_API_KEY"])
model = Model(
id="claude-sonnet-4-5-20250929",
provider="anthropic",
max_tokens=4096, # response cap
context_window=200_000, # hard model limit; defaults are usually fine
temperature=0.7,
)
The provider field on Model is a string label (e.g. "anthropic",
"openai"). It's used by the framework to clamp thinking levels and
tag responses โ keep it stable, it doesn't have to match the
provider's internal name.
Step 2 โ declare a toolโ
Every tool is a Pydantic param model + an async execute function:
from pydantic import BaseModel
from cubepi import AgentTool, AgentToolResult, TextContent
class GetWeatherParams(BaseModel):
city: str
async def get_weather(tool_call_id, params: GetWeatherParams, *, signal=None, on_update=None):
# do real work here โ call an HTTP API, query a DB, etc.
return AgentToolResult(
content=[TextContent(text=f"72ยฐF and sunny in {params.city}")]
)
weather_tool = AgentTool(
name="get_weather",
description="Get current weather for a city. Returns a short text summary.",
parameters=GetWeatherParams,
execute=get_weather,
)
A few details:
- The Pydantic model is auto-converted to JSON Schema and sent to the model as part of the tool definition.
- The
executesignature is fixed:(tool_call_id, params, *, signal, on_update). The two keyword-only args are always passed โ keep them in your signature even if you ignore them. signalis anasyncio.Eventthat's set when the user cancels. Check it inside long-running work and bail out early.on_update(partial)lets you stream incremental progress (covered in Tool Use).
Step 3 โ assemble the agentโ
from cubepi import Agent
agent = Agent(
provider=provider,
model=model,
system_prompt="You are a concise weather assistant.",
tools=[weather_tool],
)
You can pass tools=[] (or omit it) for a plain chat agent.
Step 4 โ subscribe to eventsโ
agent.subscribe(listener) is how you observe the run. The listener
receives every AgentEvent:
def on_event(event, signal=None):
if event.type == "message_update" and event.stream_event.type == "text_delta":
print(event.stream_event.delta, end="", flush=True)
elif event.type == "tool_execution_start":
print(f"\nโ calling {event.tool_name}({event.args})")
elif event.type == "tool_execution_end":
print(f" โ done")
agent.subscribe(on_event)
You can register multiple listeners and they all receive every event.
Subscribe before prompt() โ events fire as soon as the run
starts.
Step 5 โ prompt and runโ
import asyncio
async def main():
await agent.prompt("What's the weather in Tokyo?")
asyncio.run(main())
agent.prompt() does not return any value. The result lives on
agent.state.messages (the full history) and agent.state.streaming_message
(the current in-flight message, or None between turns).
Adding error handlingโ
When provider.stream() raises, the agent loop still produces an
AssistantMessage with stop_reason="error" and error_message
filled in. The event sequence is:
message_start โ message_end โ turn_end โ agent_end.
You can either:
-
Catch in the subscriber, looking for
event.type == "agent_end"and the last message'sstop_reason:def on_event(event, signal=None):if event.type == "agent_end":last = event.messages[-1]if getattr(last, "stop_reason", "") == "error":print(f"\nerror: {last.error_message}") -
Or inspect
agent.state.error_messageafterawait agent.prompt(...)returns.
Adding a cancel buttonโ
agent.abort() sets the run-level signal. The provider stream
short-circuits to "aborted", in-flight tools see signal.is_set() == True, and the loop emits agent_end cleanly.
async def main():
task = asyncio.create_task(agent.prompt("Search forโฆ"))
await asyncio.sleep(0.5)
agent.abort()
await task # always completes โ never raises
await agent.wait_for_idle()
Common pitfallsโ
RuntimeError: Agent is already processing a prompt.โ You calledprompt()twice without awaiting the first. Useawait agent.wait_for_idle()or queue withsteer()/follow_up()instead.- No
text_deltaevents โ Did you subscribe before callingprompt()? Listeners only see events emitted after registration. - Tool not found โ The model invoked a tool whose
namedoesn't match any tool intools=[...]. CubePi reports this as a tool result withis_error=Truerather than crashing โ check thetool_execution_endevent'sresult. - Pydantic ValidationError swallowed โ If the model produces malformed JSON, CubePi captures the validation error and feeds it back as a tool error result. The model usually corrects itself on the next turn.
Nextโ
- Tool Use & Parallel Execution โ multiple tools at once,
sequential mode,
terminate, incremental progress. - Streaming Events โ the full event taxonomy.
- Multi-turn Conversations โ keeping state across
turns,
steer,follow_up,resume.