跳到主要内容
版本:0.3

Recipe: Weather Agent

A complete, runnable agent that calls a real weather API as a tool. Demonstrates HTTP-calling tools, error handling, streaming UI, and cancellation.

Time to run: 5 minutes. Deps: cubepi, httpx, an ANTHROPIC_API_KEY.

The script

weather_agent.py
import asyncio
import os

import httpx
from pydantic import BaseModel, Field

from cubepi import Agent, AgentTool, AgentToolResult, Model, TextContent
from cubepi.providers.anthropic import AnthropicProvider


# --- The tool -----------------------------------------------------------

class GetWeatherParams(BaseModel):
city: str = Field(..., description="The city to look up weather for")
units: str = Field("metric", pattern="^(metric|imperial)$")


async def get_weather(tool_call_id, params: GetWeatherParams, *, signal=None, on_update=None):
# Free Open-Meteo geocoding + forecast. No API key required.
async with httpx.AsyncClient(timeout=10) as client:
try:
geo = await client.get(
"https://geocoding-api.open-meteo.com/v1/search",
params={"name": params.city, "count": 1, "language": "en"},
)
geo.raise_for_status()
results = geo.json().get("results")
if not results:
return AgentToolResult(
content=[TextContent(text=f"Couldn't find a city named {params.city!r}.")],
is_error=True,
)
lat, lon = results[0]["latitude"], results[0]["longitude"]

wx = await client.get(
"https://api.open-meteo.com/v1/forecast",
params={
"latitude": lat,
"longitude": lon,
"current_weather": True,
"temperature_unit": "celsius" if params.units == "metric" else "fahrenheit",
},
)
wx.raise_for_status()
cw = wx.json()["current_weather"]
unit = "°C" if params.units == "metric" else "°F"
return AgentToolResult(
content=[TextContent(text=f"{cw['temperature']}{unit}, wind {cw['windspeed']} km/h in {params.city}.")],
)
except httpx.HTTPError as e:
return AgentToolResult(
content=[TextContent(text=f"Weather API error: {e}")],
is_error=True,
)


weather_tool = AgentTool(
name="get_weather",
description="Get current weather for a city. Returns a short text summary.",
parameters=GetWeatherParams,
execute=get_weather,
)


# --- The agent ----------------------------------------------------------

async def main():
provider = AnthropicProvider(api_key=os.environ["ANTHROPIC_API_KEY"])
agent = Agent(
provider=provider,
model=Model(id="claude-sonnet-4-5-20250929", provider="anthropic"),
system_prompt="You are a concise weather assistant. Always use the tool; don't guess.",
tools=[weather_tool],
)

def on_event(event, signal=None):
if event.type == "message_update" and event.stream_event.type == "text_delta":
print(event.stream_event.delta, end="", flush=True)
elif event.type == "tool_execution_start":
print(f"\n[calling {event.tool_name}({event.args})]")
elif event.type == "agent_end":
print()

agent.subscribe(on_event)

# Wrap prompt() so Ctrl-C cleanly cancels the run.
task = asyncio.create_task(agent.prompt("Weather in Tokyo and São Paulo, please."))
try:
await task
except KeyboardInterrupt:
agent.abort()
await agent.wait_for_idle()


if __name__ == "__main__":
asyncio.run(main())

Run:

pip install cubepi httpx
export ANTHROPIC_API_KEY=sk-…
python weather_agent.py

Sample output:

[calling get_weather({'city': 'Tokyo', 'units': 'metric'})]
[calling get_weather({'city': 'São Paulo', 'units': 'metric'})]
Tokyo is currently 18°C with a wind speed of 12 km/h. São Paulo is 25°C with winds of 9 km/h.

What's going on

  • Two tools in parallel. The model emits two get_weather tool calls in the same assistant turn. CubePi runs them concurrently — the second one doesn't wait for the first.
  • Streaming text + tool events interleave. The on_event filter handles both text_delta (for the final answer) and tool_execution_start (for the "thinking" indicator).
  • Errors are tool results, not exceptions. A bad city or network hiccup returns is_error=True; the model gets the error message and usually retries with a different spelling.
  • Cancellation is clean. agent.abort() propagates through signal into in-flight tools and the provider stream.

Extending this recipe

  • Add caching: memoize the geocoding lookup by city — Open-Meteo's coordinates are stable.
  • Add retries: wrap with RetryMiddleware to handle transient API errors.
  • Persist conversations: add a SQLiteCheckpointer so follow-up questions ("and in Osaka?") have history.

See also