Recipe: Resumable Long Tasks
When an agent is mid-flight through a long-running operation (a series
of tool calls, a multi-turn reasoning session) and the process dies,
you want to come back and pick up where it left off â not start over.
CubePi's append-only checkpointing plus agent.resume() makes this
trivial between turns; for resumption mid-tool, you need a little
more care.
Time to run: 15 minutes.
Deps: cubepi[sqlite], an ANTHROPIC_API_KEY.
The patternâ
There are three crash points to think about:
- Between turns â The model has answered, no tools to run, the
loop is between iterations.
resume()re-invokes the model. Free with the checkpointer. - After tool results, before model call â Tool results are
persisted.
resume()sees the last message is aToolResultMessageand re-invokes the model. Free with the checkpointer. - Mid-tool â The tool started but didn't finish. Nothing is persisted yet (CubePi only persists messages). You need tool-internal idempotency. Requires care.
This recipe focuses on case 3.
Idempotent tools with external stateâ
The pattern: each tool action has a deterministic, idempotent key. Before doing the work, check whether it's been done.
import os
import json
from pathlib import Path
from pydantic import BaseModel
from cubepi import AgentTool, AgentToolResult, TextContent
# Simple file-backed job store; replace with Redis / Postgres in prod.
JOB_DIR = Path(os.environ.get("JOB_DIR", "/tmp/cubepi-jobs"))
JOB_DIR.mkdir(parents=True, exist_ok=True)
class TranscodeParams(BaseModel):
source_path: str
output_path: str
async def transcode_video(tool_call_id, params: TranscodeParams, *, signal=None, on_update=None):
job_key = f"transcode:{params.source_path}->{params.output_path}"
job_file = JOB_DIR / f"{job_key.replace('/', '_')}.json"
if job_file.exists():
# Already done in a previous run.
state = json.loads(job_file.read_text())
return AgentToolResult(
content=[TextContent(text=f"Already transcoded to {state['output_path']}.")],
details=state,
)
# Do the actual work (long-running, expensive).
# Use signal to abort cleanly if cancelled.
output = await run_ffmpeg(params.source_path, params.output_path, signal=signal)
# Commit the job-done marker AFTER the work succeeds.
job_file.write_text(json.dumps({"output_path": output}))
return AgentToolResult(
content=[TextContent(text=f"Transcoded to {output}.")],
details={"output_path": output},
)
transcode_tool = AgentTool(
name="transcode_video",
description="Transcode a video file. Idempotent â safe to retry.",
parameters=TranscodeParams,
execute=transcode_video,
execution_mode="sequential", # one transcode at a time
)
Now if the process dies during run_ffmpeg, the next agent run sees
job_file.exists() == False, redoes the work, and only writes the
marker on success. If the process dies after the marker was
written, the next run sees the marker, returns the cached result
immediately, and the agent continues as if it had just finished.
Resuming the agentâ
import asyncio
import os
import sys
from cubepi import Agent, Model
from cubepi.checkpointer import SQLiteCheckpointer
from cubepi.providers.anthropic import AnthropicProvider
from tools import transcode_tool # the wrapped AgentTool from above
async def main(thread_id: str, initial_prompt: str | None):
async with SQLiteCheckpointer("jobs.db") as cp:
agent = Agent(
provider=AnthropicProvider(api_key=os.environ["ANTHROPIC_API_KEY"]),
model=Model(id="claude-sonnet-4-5-20250929", provider="anthropic"),
system_prompt="You orchestrate video transcoding jobs.",
tools=[transcode_tool],
checkpointer=cp,
thread_id=thread_id,
)
agent.subscribe(lambda e, s=None: None)
if initial_prompt:
# Fresh run. prompt() auto-loads history on first call before
# appending the new user message.
await agent.prompt(initial_prompt)
else:
# Resume. agent.resume() does NOT auto-load â only prompt() does.
# Hydrate the agent state manually first.
data = await cp.load(thread_id)
if data is None:
raise RuntimeError(f"No saved state for thread {thread_id!r}")
agent.state.messages = list(data.messages)
# `extra` is restored too; it's private on Agent, so use the
# checkpointer's view if your middleware reads it.
# Resume picks up from the last persisted message:
# ToolResultMessage / UserMessage â re-invokes the model
# AssistantMessage with no queued steer/follow_up â raises
await agent.resume()
if __name__ == "__main__":
thread_id = sys.argv[1]
initial = sys.argv[2] if len(sys.argv) > 2 else None
asyncio.run(main(thread_id, initial))
Workflow:
# Start a job:
python resume.py job-1 "Transcode /videos/a.mov to /out/a.mp4 and /videos/b.mov to /out/b.mp4"
# Kill it mid-flight: Ctrl-C.
# Resume â agent picks up from the last persisted message:
python resume.py job-1
The three resume scenarios in codeâ
async def smart_resume(agent, cp, thread_id):
# resume() doesn't auto-load â hydrate the agent first if its state is empty.
if not agent.state.messages:
data = await cp.load(thread_id)
if data is None or not data.messages:
return False # nothing to resume from
agent.state.messages = list(data.messages)
last = agent.state.messages[-1]
last_role = type(last).__name__
if last_role == "AssistantMessage":
# Either the run finished naturally, or it died right after a
# turn ended. resume() raises unless there's queued steering.
# Easiest path: ask the user what's next.
return False
# Last is UserMessage or ToolResultMessage â safe to resume.
await agent.resume()
return True
Persistence + abortâ
agent.abort() triggers a clean teardown that emits agent_end. The
last fully persisted message is whatever made it through
message_end. Aborts during a tool's execution don't persist the
tool result (the tool didn't return), so resume() will re-dispatch
the model with the last AssistantMessage containing the unfinished
ToolCall. The model will usually re-issue the call â your
idempotency guards handle the rest.
What about persisting partial tool state?â
CubePi doesn't expose a "persist a partial tool result" API. The
intended pattern is: keep partial state in the tool's own
external store (filesystem, Redis, S3), keyed deterministically by the
tool args. That's what transcode_video above does with JOB_DIR.
Common pitfallsâ
- Non-idempotent tools â Without deterministic keys, retries can charge a credit card twice or send a duplicate email. Always wrap external side-effects in an idempotency key.
- Job markers in
/tmpâ Cleared on reboot. Use a real persistence layer for production jobs. resume()after an assistant message with no queue â Raises. Either prompt the user for the next message or callprompt()fresh.resume()on a fresh agent â RaisesNo messages to continue from.resume()does not auto-load from the checkpointer; onlyprompt()does. Hydrate manually withagent.state.messages = (await cp.load(thread_id)).messagesfirst.- Forgetting the signal check inside the tool â A long
await asyncio.sleep(...)or afor ... in streamthat ignoressignal.is_set()won't honourabort. Drop a check inside any hot loop.
See alsoâ
- Multi-turn â
resume()â full semantics. - Persistent Chat â the simpler restartable scenario.
- SQLite Checkpointing â what's persisted, when.