Skip to main content
Version: Next 🚧

API, events & reference

Agent API​

Constructor​

Agent(
...,
channel: HitlChannel | None = None, # bind a channel to this agent
)

If a channel is bound, the Agent wires its _process_event as the channel's emit callback so HitlRequestEvent / HitlAnswerEvent are dispatched to all agent listeners. A _run_lock (asyncio.Lock) serialises prompt, resume, and respond calls.

Properties​

PropertyReturnsDescription
agent.channelHitlChannel | NoneThe bound channel or None.
agent.in_flight_hitl_requestHitlRequest | NoneThe channel's current pending payload. Raises HitlError if no channel bound.

Methods​

MethodSignatureDescription
load_pending_hitl_request()async → HitlRequest | NoneReads the pending from the checkpointer (even post-detach).
detach()async → NoneEmits AgentSuspendedEvent(pending_request=...) then triggers HitlDetached on the channel future. The loop exits silently; assistant message retains unresolved tool calls; pending_request stays persisted.
respond(*, question_id=, answer=)async → NoneResumes a suspended run. Validates qid matches persistent pending, attaches answer to channel, re-enters the loop via run_agent_loop_resume.
abort_pending(reason=)async → NoneCloses the conversation. Two-phase: Phase 1 signals in-flight await (no lock). Phase 2 appends synthetic deny tool_results + terminal stop_reason="aborted" assistant (under lock).

in_flight_hitl_request is a synchronous property (reads the in-memory channel slot). load_pending_hitl_request() is async (reads from the checkpointer; useful post-detach or in a fresh process).

Events​

Four new events are emitted on the agent's event stream:

EventWhenKey fields
HitlRequestEventChannel receives a new confirm/approve/ask.request: HitlRequest
HitlAnswerEventchannel.answer() or channel.cancel() fires.question_id: str, answer: Any, cancelled: bool, timed_out: bool
AgentSuspendedEventagent.detach() called while HITL was pending.pending_request: HitlRequest
AgentAbortedEventagent.abort_pending() closes the conversation.reason: str

These are all included in the AgentEvent union, so typed listeners automatically cover them. HitlRequestEvent and HitlAnswerEvent are emitted by the channel through the agent's emit binding. AgentSuspendedEvent and AgentAbortedEvent are emitted by the Agent layer (not the loop — the Agent has the channel handle to populate the real pending_request payload).

Trace spans​

When the cubepi[tracing] extra is installed, each HITL await is wrapped in an OpenTelemetry span:

Span nameAttributes
hitl.approvehitl.tool_name, hitl.tool_call_id, hitl.outcome, hitl.from_resume, hitl.duration_seconds
hitl.confirmhitl.question_id, hitl.outcome, hitl.duration_seconds
hitl.askhitl.question_id, hitl.outcome, hitl.duration_seconds

hitl.outcome is one of: approved, denied, edited, answered, cancelled, timed_out, aborted, detached.

The tracing import is lazy — if opentelemetry is not installed, the channel silently falls back to a no-op span.

Error reference​

ExceptionBase classMeaning
HitlCancelled(reason)BaseExceptionHost called channel.cancel(qid).
HitlTimedOut(seconds)BaseExceptionPer-call or channel-default timeout expired.
HitlDetachedBaseExceptionagent.detach() called during HITL await.
HitlAbortedBaseExceptionagent.abort_pending() signalled the agent.
HitlConcurrencyErrorExceptionconfirm/approve/ask called while channel already has a pending request.
HitlStaleAnswerExceptionchannel.answer(qid) with a question_id that doesn't match the current pending.
HitlNoPendingRequestExceptionagent.respond(â€Ļ) called but no pending_request on the thread.
HitlDurabilityNotGuaranteedExceptionCustom tool called CheckpointedChannel.ask() without allow_inside_custom_tool=True.

HitlControlException (the parent of the four BaseException subclasses) is intentionally NOT caught by the existing broad except Exception: handlers in cubepi.agent.tools._prepare_tool_call and _execute_prepared — this mirrors asyncio.CancelledError's pattern.

Testing helpers​

from cubepi.hitl.testing import ScriptedChannel, NoopChannel

# ScriptedChannel: pre-programmed answers, consumed in order.
ch = ScriptedChannel(answers=[
ApproveAnswer(decision="approve"),
{"color": "red"}, # ask answer
lambda req: ApproveAnswer(decision="deny", reason="test") # callable
])
assert len(ch.history) == 3 # all HitlRequests ever seen

# NoopChannel: auto-approves everything. Useful for subagents.
ch = NoopChannel()
assert (await ch.approve("bash", "tc", {})).decision == "approve"
assert (await ch.confirm("?")) is True
assert await ch.ask([Question(key="k", prompt="p")]) == {"k": ""}

Architecture notes​

  • Single pending per thread. The agent loop is sequential — at most one HITL request is outstanding per thread_id. Concurrent confirm/approve/ask raises HitlConcurrencyError.
  • Prompt-cache prefix invariant. Between pause and resume, the messages list changes only by appending tool-result messages and the next assistant turn at the tail. No inserting, reordering, or mutating prior messages — that would invalidate the provider-side prompt cache.
  • question_id == tool_call_id for approve requests. No aliasing or mapping needed — hosts that already track call_id from the tool stream pass it directly.
  • Resume does not replay. It re-enters the loop with the answer pre-loaded into the channel. The last assistant message's unresolved tool calls dictate what executes next. No node-based replay semantics.