Orchestration¶
Multi-agent orchestration for complex conversational workflows. See the Multi-Agent Orchestration guide for usage examples.
Strategies¶
Orchestration ¶
Bases: ABC
Abstract base for orchestration strategies.
Orchestration strategies compose existing primitives
(ConversationPipeline, ConversationRouter, HandoffHandler)
into declarative patterns that can be passed to RoomKit or
create_room.
Subclasses must implement:
- :meth:
agents— which agents participate in the room. - :meth:
install— wire hooks, tools, and state into a room.
agents
abstractmethod
¶
Return agents to register and attach to the room.
The framework calls this to determine which agents should be registered on the kit and attached to the room at creation time.
install
abstractmethod
async
¶
Wire hooks, tools, and state into the room.
Called after agents are registered and attached. Implementations should install room-scoped hooks, set up handoff tools, and initialise conversation state.
Pipeline ¶
Pipeline(agents, routing=None, *, supervisor_id=None, voice_channel_id=None, greet_on_handoff=False, greeting_prompt=None, farewell_prompt=None)
Bases: Orchestration
Linear pipeline orchestration strategy.
Agents are chained in order: the first agent is the entry point, each subsequent agent is reachable via handoff from its predecessor.
Example::
kit = RoomKit(
orchestration=Pipeline(
agents=[triage, support, billing],
),
)
room = await kit.create_room()
Initialise the pipeline strategy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agents
|
list[Agent]
|
Ordered list of agents. First agent is the entry point. Stages chain linearly (each agent hands off to the next). |
required |
routing
|
dict[str, list[str]] | None
|
Optional keyword hints injected into agent system prompts (not runtime rules). Keys are agent channel IDs, values are keyword lists describing when to route there. |
None
|
supervisor_id
|
str | None
|
Optional supervisor agent ID that receives all events for monitoring. |
None
|
voice_channel_id
|
str | None
|
Voice channel ID for voice-aware handoffs. |
None
|
greet_on_handoff
|
bool
|
Whether agents greet on handoff. |
False
|
greeting_prompt
|
str | None
|
Custom greeting prompt template. |
None
|
farewell_prompt
|
str | None
|
Custom farewell prompt template. |
None
|
Swarm ¶
Bases: Orchestration
Swarm orchestration strategy.
Every agent can hand off to every other agent with no phase constraints. Routing uses sticky agent affinity — once an agent is active, it keeps handling until it hands off.
Example::
kit = RoomKit(
orchestration=Swarm(
agents=[sales, support, billing],
entry="sales",
),
)
room = await kit.create_room()
Initialise the swarm strategy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agents
|
list[Agent]
|
All participating agents. |
required |
entry
|
str | None
|
Channel ID of the entry-point agent. Defaults to the first agent's channel ID. |
None
|
Supervisor ¶
Supervisor(supervisor, workers, *, strategy=None, auto_delegate=False, async_delivery=False, refine_task=True, refine_instruction=None, delegation_message="I'm dispatching my team to work on this.", wait_for_result=True, share_channels=None)
Bases: Orchestration
Supervisor orchestration strategy.
The supervisor handles all user interaction. Workers are registered
on the kit (so delegate() can find them) but are NOT attached
to the parent room — they run in child rooms.
Examples::
# Framework-driven: auto-delegate with task refinement
Supervisor(
supervisor=coordinator,
workers=[researcher, writer],
strategy="sequential",
auto_delegate=True,
)
# Framework-driven: workers get raw user message
Supervisor(
supervisor=coordinator,
workers=[technical, business],
strategy="parallel",
auto_delegate=True,
refine_task=False,
)
# Tool-based: AI decides when to delegate
Supervisor(
supervisor=coordinator,
workers=[researcher, writer],
strategy="sequential",
)
# Manual: per-worker tools, AI decides everything
Supervisor(
supervisor=coordinator,
workers=[researcher, writer],
)
Initialise the supervisor strategy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
supervisor
|
Agent
|
The agent that handles user interaction and delegates tasks. |
required |
workers
|
list[Agent]
|
Agents that run delegated tasks in child rooms. |
required |
strategy
|
WorkerStrategy | str | None
|
Deterministic execution pattern for workers.
|
None
|
auto_delegate
|
bool
|
If |
False
|
async_delivery
|
bool
|
If |
False
|
refine_task
|
bool
|
Controls whether the framework extracts a clean topic from the user's message before sending to workers. |
True
|
refine_instruction
|
str | None
|
Custom instruction for topic extraction. Overrides the default. |
None
|
delegation_message
|
str | None
|
Message injected into the conversation
when workers are dispatched (async mode only). Set to
|
"I'm dispatching my team to work on this."
|
wait_for_result
|
bool
|
When strategy is |
True
|
share_channels
|
list[str] | None
|
Channel IDs from the parent room to share
with every child room created during delegation. For
example, passing |
None
|
Loop ¶
Loop(agent, reviewers=None, reviewer=None, max_iterations=3, *, strategy=None, async_delivery=False)
Bases: Orchestration
Loop orchestration strategy.
The producing agent generates output, then reviewers evaluate it. If all reviewers approve, the loop ends. Otherwise, feedback is routed back to the producer for revision.
Examples::
# Single reviewer
Loop(agent=writer, reviewers=[editor], max_iterations=3)
# Multiple reviewers — sequential (chained)
Loop(
agent=coder,
reviewers=[security, perf, style],
strategy="sequential",
)
# Multiple reviewers — parallel (fan-out)
Loop(
agent=coder,
reviewers=[security, perf, style],
strategy="parallel",
)
# Voice — async delivery
Loop(
agent=writer,
reviewers=[editor],
async_delivery=True,
)
Initialise the loop strategy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent
|
Agent
|
The producing agent. |
required |
reviewers
|
list[Agent] | None
|
List of reviewing agents. For multiple reviewers, use strategy to control execution order. |
None
|
reviewer
|
Agent | None
|
Single reviewer (convenience, same as
|
None
|
max_iterations
|
int
|
Maximum number of produce-review cycles. |
3
|
strategy
|
WorkerStrategy | str | None
|
How reviewers execute when there are multiple:
|
None
|
async_delivery
|
bool
|
If |
False
|
Agents¶
Agent ¶
Agent(channel_id, *, provider=None, role=None, description=None, scope=None, voice=None, greeting=None, language=None, auto_greet=True, **kwargs)
Bases: AIChannel
AI agent with structured identity metadata.
Extends :class:AIChannel with role, description, scope,
voice, and greeting fields. The first three are auto-injected
into the system prompt as an identity block; voice is read by
:meth:ConversationPipeline.install to auto-wire the voice map;
greeting is spoken directly via TTS when a new voice session
becomes ready (controlled by auto_greet).
When provider is omitted the agent is config-only — it holds
identity and prompt data for speech-to-speech orchestration via
:class:RealtimeVoiceChannel but cannot generate responses itself.
Example::
# Full agent (STT → LLM → TTS)
triage = Agent(
"agent-triage",
provider=GeminiAIProvider(config),
role="Triage receptionist",
description="Routes callers to the right specialist",
voice="21m00Tcm4TlvDq8ikWAM",
system_prompt="Greet callers warmly.",
)
# Config-only agent (speech-to-speech)
triage = Agent(
"agent-triage",
role="Triage receptionist",
description="Routes callers to the right specialist",
voice="Aoede",
system_prompt="Greet callers warmly.",
)
build_identity_block ¶
Build the identity block appended to the system prompt.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
language
|
str | None
|
Override language (e.g. from conversation state).
Falls back to |
None
|
Returns None when all identity fields are None.
ConversationRouter ¶
Routes events to the appropriate agent.
Installed as a BEFORE_BROADCAST sync hook (priority -100) that stamps routing metadata on events. The EventRouter reads this metadata to skip non-targeted intelligence channels.
Usage::
router = ConversationRouter(
rules=[...],
default_agent_id="agent-triage",
supervisor_id="agent-supervisor",
)
kit.hook(
HookTrigger.BEFORE_BROADCAST,
execution=HookExecution.SYNC,
priority=-100,
)(router.as_hook())
Initialize the router.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rules
|
list[RoutingRule] | None
|
Routing rules evaluated in ascending priority order (lower number = evaluated first, i.e. higher precedence). |
None
|
default_agent_id
|
str | None
|
Fallback agent when no rule matches. |
None
|
supervisor_id
|
str | None
|
Agent that receives escalations. |
None
|
select_agent ¶
Determine which agent should handle this event.
Priority order: 1. If active_agent_id is set and the agent is still in the room -> sticky affinity (agent keeps handling) 2. Evaluate rules in priority order -> first match wins 3. Fall back to default_agent_id
as_hook ¶
Return a BEFORE_BROADCAST sync hook function.
The hook stamps event.metadata with routing information
that EventRouter uses to filter intelligence channels.
install ¶
Wire routing and handoff in one call.
Registers this router as a BEFORE_BROADCAST sync hook,
builds a HandoffHandler, and calls setup_handoff
on every agent.
Returns the HandoffHandler for further customisation.
RoutingRule ¶
Bases: BaseModel
A routing rule mapping conditions to an agent.
Rules are evaluated in ascending priority order (lower values first).
The first matching rule wins. Use negative priorities to ensure a rule
is evaluated before the default 0, or positive values to defer it.
priority
class-attribute
instance-attribute
¶
Evaluation order — lower values are checked first (default 0).
RoutingConditions ¶
Bases: BaseModel
Conditions for a routing rule to match.
All specified conditions are ANDed — every non-None field must match.
Conversation State¶
ConversationState ¶
Bases: BaseModel
Tracks conversation progress within a room.
All fields are optional with sensible defaults so rooms without orchestration have zero overhead.
transition ¶
Create a new state with a phase transition recorded.
Returns a new instance (immutable pattern) — the original is never modified.
ConversationPhase ¶
Bases: StrEnum
Built-in conversation phases.
Users can use custom string values — routing and state do not restrict phases to this enum.
PhaseTransition ¶
Bases: BaseModel
Audit record for a phase change.
get_conversation_state ¶
Extract typed ConversationState from room metadata.
Returns a fresh default state if no orchestration state exists.
set_conversation_state ¶
Return a room copy with updated conversation state.
Does NOT persist — the caller must save via store.update_room().
Pipeline¶
ConversationPipeline ¶
Generates routing rules for sequential agent workflows.
Example::
pipeline = ConversationPipeline(
stages=[
PipelineStage(phase="analysis", agent_id="agent-discuss", next="coding"),
PipelineStage(phase="coding", agent_id="agent-coder", next="review"),
PipelineStage(phase="review", agent_id="agent-reviewer",
next="report", can_return_to={"coding"}),
PipelineStage(phase="report", agent_id="agent-writer", next=None),
],
supervisor_id="agent-supervisor",
)
router = pipeline.to_router()
get_allowed_transitions ¶
Return phase -> allowed next phases for validation.
install ¶
install(kit, agents, *, agent_aliases=None, hook_priority=-100, greet_on_handoff=False, voice_channel_id=None, greeting_prompt=None, farewell_prompt=None)
Wire routing and handoff in one call.
Creates a router from this pipeline, registers it as a
BEFORE_BROADCAST sync hook, builds a HandoffHandler
with the pipeline's phase map and transition constraints,
and calls setup_handoff on every agent.
When greet_on_handoff is True, two extra hooks are
registered:
- ON_HANDOFF (async): blocks the old agent's farewell via
a
BEFORE_TTSflag, then sends a synthetic inbound message on voice_channel_id to prompt the new agent to greet. - BEFORE_TTS (sync): blocks TTS while a handoff is pending.
Returns (router, handler) for further customisation.
PipelineStage ¶
Bases: BaseModel
A stage in a conversation pipeline.
Handoff¶
HANDOFF_TOOL
module-attribute
¶
HANDOFF_TOOL = AITool(name='handoff_conversation', description="Transfer this conversation to another agent or specialist. Use when: the user asks to speak with someone else, the user wants to be transferred or go back to a previous agent, the conversation needs expertise you don't have, or your task is complete and the next step requires a different agent. Always provide a clear summary of the conversation so far.", parameters={'type': 'object', 'properties': {'target': {'type': 'string', 'description': "Target agent ID or alias (e.g., 'agent-advisor', 'agent-coder', 'human')"}, 'reason': {'type': 'string', 'description': 'Why the handoff is needed'}, 'summary': {'type': 'string', 'description': 'Summary of the conversation so far and what the next agent needs to know to continue effectively'}, 'next_phase': {'type': 'string', 'description': "Optional: conversation phase to transition to (e.g., 'handling', 'review', 'escalation')"}, 'channel_escalation': {'type': 'string', 'enum': ['same', 'voice', 'email', 'sms'], 'description': "Whether to escalate to a different channel. 'same' keeps the current channel."}}, 'required': ['target', 'reason', 'summary']})
build_handoff_tool ¶
Build a handoff tool with constrained target enum.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
targets
|
list[tuple[str, str | None]]
|
List of |
required |
Returns:
| Name | Type | Description |
|---|---|---|
An |
AITool
|
class: |
AITool
|
restricting the AI to only the listed agent IDs. |
HandoffHandler ¶
HandoffHandler(kit, router, agent_aliases=None, phase_map=None, allowed_transitions=None, known_agents=None, on_handoff_complete=None, event_channel_id=None)
Processes handoff tool calls.
Intercepts the handoff_conversation tool, updates conversation
state, emits a system event, and returns the result to the agent.
on_handoff_complete
property
writable
¶
Callback invoked after a successful handoff.
get_room_language ¶
Get effective language: room override > agent default.
handle
async
¶
Process a handoff request.
Steps: 1. Resolve target agent (alias -> channel_id) 2. Validate target exists in room 3. Update ConversationState 4. Persist state to room metadata 5. Emit system event in room 6. Return result to calling agent
send_greeting
async
¶
Send the initial agent greeting for a room.
Looks up the current active agent from conversation state and
sends its greeting. For :class:RealtimeVoiceChannel,
injects text directly into the provider session. For traditional
voice, sends a synthetic inbound message to trigger an AI response.
Call this after setting the initial conversation state::
room = set_conversation_state(room, ConversationState(...))
await kit.store.update_room(room)
await handler.send_greeting(room_id, channel_id="voice")
Does nothing when the active agent has no greeting configured.
set_language
async
¶
Change the conversation language for a room.
Stores the language in conversation state and, for realtime sessions, reconfigures the active agent's session with an updated system prompt that includes the language instruction.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
room_id
|
str
|
The room to update. |
required |
language
|
str
|
Language name or code (e.g. |
required |
channel_id
|
str | None
|
Voice channel ID. Falls back to the event channel
configured by |
None
|
HandoffRequest ¶
Bases: BaseModel
Parsed from agent's handoff tool call arguments.
HandoffResult ¶
Bases: BaseModel
Result returned to the calling agent.
HandoffMemoryProvider ¶
Bases: MemoryProvider
Injects handoff context (summary, reason) into agent prompts.
Wraps an inner MemoryProvider and prepends handoff information when the conversation state indicates a recent handoff.
setup_handoff ¶
Wire handoff into an AIChannel's tool chain.
- Injects the handoff tool into the channel's tool definitions
- Wraps the tool handler to intercept
handoff_conversationcalls - Uses
_room_id_varContextVar for room_id (set by routing hook)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
channel
|
AIChannel
|
The AI channel to wire handoff into. |
required |
handler
|
HandoffHandler
|
The handoff handler that processes tool calls. |
required |
tool
|
AITool | None
|
Optional custom handoff tool (e.g. from :func: |
None
|