Skip to content

Orchestration

Multi-agent orchestration for complex conversational workflows. See the Multi-Agent Orchestration guide for usage examples.

Strategies

Orchestration

Bases: ABC

Abstract base for orchestration strategies.

Orchestration strategies compose existing primitives (ConversationPipeline, ConversationRouter, HandoffHandler) into declarative patterns that can be passed to RoomKit or create_room.

Subclasses must implement:

  • :meth:agents — which agents participate in the room.
  • :meth:install — wire hooks, tools, and state into a room.

agents abstractmethod

agents()

Return agents to register and attach to the room.

The framework calls this to determine which agents should be registered on the kit and attached to the room at creation time.

install abstractmethod async

install(kit, room_id)

Wire hooks, tools, and state into the room.

Called after agents are registered and attached. Implementations should install room-scoped hooks, set up handoff tools, and initialise conversation state.

Pipeline

Pipeline(agents, routing=None, *, supervisor_id=None, voice_channel_id=None, greet_on_handoff=False, greeting_prompt=None, farewell_prompt=None)

Bases: Orchestration

Linear pipeline orchestration strategy.

Agents are chained in order: the first agent is the entry point, each subsequent agent is reachable via handoff from its predecessor.

Example::

kit = RoomKit(
    orchestration=Pipeline(
        agents=[triage, support, billing],
    ),
)
room = await kit.create_room()

Initialise the pipeline strategy.

Parameters:

Name Type Description Default
agents list[Agent]

Ordered list of agents. First agent is the entry point. Stages chain linearly (each agent hands off to the next).

required
routing dict[str, list[str]] | None

Optional keyword hints injected into agent system prompts (not runtime rules). Keys are agent channel IDs, values are keyword lists describing when to route there.

None
supervisor_id str | None

Optional supervisor agent ID that receives all events for monitoring.

None
voice_channel_id str | None

Voice channel ID for voice-aware handoffs.

None
greet_on_handoff bool

Whether agents greet on handoff.

False
greeting_prompt str | None

Custom greeting prompt template.

None
farewell_prompt str | None

Custom farewell prompt template.

None

agents

agents()

Return all pipeline agents.

install async

install(kit, room_id)

Wire pipeline routing and handoff into the room.

Swarm

Swarm(agents, entry=None)

Bases: Orchestration

Swarm orchestration strategy.

Every agent can hand off to every other agent with no phase constraints. Routing uses sticky agent affinity — once an agent is active, it keeps handling until it hands off.

Example::

kit = RoomKit(
    orchestration=Swarm(
        agents=[sales, support, billing],
        entry="sales",
    ),
)
room = await kit.create_room()

Initialise the swarm strategy.

Parameters:

Name Type Description Default
agents list[Agent]

All participating agents.

required
entry str | None

Channel ID of the entry-point agent. Defaults to the first agent's channel ID.

None

agents

agents()

Return all swarm agents.

install async

install(kit, room_id)

Wire swarm routing and bidirectional handoff into the room.

Supervisor

Supervisor(supervisor, workers, *, strategy=None, auto_delegate=False, async_delivery=False, refine_task=True, refine_instruction=None, delegation_message="I'm dispatching my team to work on this.", wait_for_result=True, share_channels=None)

Bases: Orchestration

Supervisor orchestration strategy.

The supervisor handles all user interaction. Workers are registered on the kit (so delegate() can find them) but are NOT attached to the parent room — they run in child rooms.

Examples::

# Framework-driven: auto-delegate with task refinement
Supervisor(
    supervisor=coordinator,
    workers=[researcher, writer],
    strategy="sequential",
    auto_delegate=True,
)

# Framework-driven: workers get raw user message
Supervisor(
    supervisor=coordinator,
    workers=[technical, business],
    strategy="parallel",
    auto_delegate=True,
    refine_task=False,
)

# Tool-based: AI decides when to delegate
Supervisor(
    supervisor=coordinator,
    workers=[researcher, writer],
    strategy="sequential",
)

# Manual: per-worker tools, AI decides everything
Supervisor(
    supervisor=coordinator,
    workers=[researcher, writer],
)

Initialise the supervisor strategy.

Parameters:

Name Type Description Default
supervisor Agent

The agent that handles user interaction and delegates tasks.

required
workers list[Agent]

Agents that run delegated tasks in child rooms.

required
strategy WorkerStrategy | str | None

Deterministic execution pattern for workers.

  • "sequential": workers run in order, each receiving the previous worker's output.
  • "parallel": all workers run concurrently on the same task.
  • None (default): per-worker delegate_to_<id> tools are injected and the AI decides when to call them.
None
auto_delegate bool

If True, the framework triggers workers automatically — no tool needed. For sync channels (CLI), blocks until results are ready. For async channels (voice), runs in background. Requires strategy.

False
async_delivery bool

If True, workers run in the background and results are delivered via kit.deliver() when ready. The conversation continues uninterrupted. For voice channels, injects a delegate_workers tool that the AI calls when delegation is needed. If False (default), blocks until results are ready (sync mode).

False
refine_task bool

Controls whether the framework extracts a clean topic from the user's message before sending to workers.

True
refine_instruction str | None

Custom instruction for topic extraction. Overrides the default.

None
delegation_message str | None

Message injected into the conversation when workers are dispatched (async mode only). Set to None to disable. Default: "I'm dispatching my team to work on this."

"I'm dispatching my team to work on this."
wait_for_result bool

When strategy is None, controls whether delegation runs inline (True, default) or in the background (False). Ignored when strategy is set or auto_delegate is True.

True
share_channels list[str] | None

Channel IDs from the parent room to share with every child room created during delegation. For example, passing ["system", "ws-status"] attaches those channels to each worker's child room so the worker can emit events visible on those channels.

None

agents

agents()

Return agents to attach to the room.

In async_delivery mode, the supervisor is NOT attached — the conversation channel (e.g. RealtimeVoiceChannel) handles user interaction independently.

install async

install(kit, room_id)

Wire supervisor routing and delegation tools.

Loop

Loop(agent, reviewers=None, reviewer=None, max_iterations=3, *, strategy=None, async_delivery=False)

Bases: Orchestration

Loop orchestration strategy.

The producing agent generates output, then reviewers evaluate it. If all reviewers approve, the loop ends. Otherwise, feedback is routed back to the producer for revision.

Examples::

# Single reviewer
Loop(agent=writer, reviewers=[editor], max_iterations=3)

# Multiple reviewers — sequential (chained)
Loop(
    agent=coder,
    reviewers=[security, perf, style],
    strategy="sequential",
)

# Multiple reviewers — parallel (fan-out)
Loop(
    agent=coder,
    reviewers=[security, perf, style],
    strategy="parallel",
)

# Voice — async delivery
Loop(
    agent=writer,
    reviewers=[editor],
    async_delivery=True,
)

Initialise the loop strategy.

Parameters:

Name Type Description Default
agent Agent

The producing agent.

required
reviewers list[Agent] | None

List of reviewing agents. For multiple reviewers, use strategy to control execution order.

None
reviewer Agent | None

Single reviewer (convenience, same as reviewers=[reviewer]).

None
max_iterations int

Maximum number of produce-review cycles.

3
strategy WorkerStrategy | str | None

How reviewers execute when there are multiple:

  • "sequential": reviewers chain — each sees the previous reviewer's feedback.
  • "parallel": reviewers fan-out — all review independently, feedback combined.
  • None (default): sequential for multiple reviewers, single reviewer doesn't need a strategy.
None
async_delivery bool

If True, the loop runs in the background and results are delivered via kit.deliver() when ready. The conversation continues uninterrupted.

False

agents

agents()

Return the producer — it presents results to the user.

install async

install(kit, room_id)

Wire the framework-driven loop.

Agents

Agent

Agent(channel_id, *, provider=None, role=None, description=None, scope=None, voice=None, greeting=None, language=None, auto_greet=True, **kwargs)

Bases: AIChannel

AI agent with structured identity metadata.

Extends :class:AIChannel with role, description, scope, voice, and greeting fields. The first three are auto-injected into the system prompt as an identity block; voice is read by :meth:ConversationPipeline.install to auto-wire the voice map; greeting is spoken directly via TTS when a new voice session becomes ready (controlled by auto_greet).

When provider is omitted the agent is config-only — it holds identity and prompt data for speech-to-speech orchestration via :class:RealtimeVoiceChannel but cannot generate responses itself.

Example::

# Full agent (STT → LLM → TTS)
triage = Agent(
    "agent-triage",
    provider=GeminiAIProvider(config),
    role="Triage receptionist",
    description="Routes callers to the right specialist",
    voice="21m00Tcm4TlvDq8ikWAM",
    system_prompt="Greet callers warmly.",
)

# Config-only agent (speech-to-speech)
triage = Agent(
    "agent-triage",
    role="Triage receptionist",
    description="Routes callers to the right specialist",
    voice="Aoede",
    system_prompt="Greet callers warmly.",
)

is_config_only property

is_config_only

Whether this agent has no AI provider (config-only mode).

system_prompt property

system_prompt

The base system prompt for this agent.

build_identity_block

build_identity_block(language=None)

Build the identity block appended to the system prompt.

Parameters:

Name Type Description Default
language str | None

Override language (e.g. from conversation state). Falls back to self.language when None.

None

Returns None when all identity fields are None.

ConversationRouter

ConversationRouter(rules=None, default_agent_id=None, supervisor_id=None)

Routes events to the appropriate agent.

Installed as a BEFORE_BROADCAST sync hook (priority -100) that stamps routing metadata on events. The EventRouter reads this metadata to skip non-targeted intelligence channels.

Usage::

router = ConversationRouter(
    rules=[...],
    default_agent_id="agent-triage",
    supervisor_id="agent-supervisor",
)

kit.hook(
    HookTrigger.BEFORE_BROADCAST,
    execution=HookExecution.SYNC,
    priority=-100,
)(router.as_hook())

Initialize the router.

Parameters:

Name Type Description Default
rules list[RoutingRule] | None

Routing rules evaluated in ascending priority order (lower number = evaluated first, i.e. higher precedence).

None
default_agent_id str | None

Fallback agent when no rule matches.

None
supervisor_id str | None

Agent that receives escalations.

None

select_agent

select_agent(event, context, state)

Determine which agent should handle this event.

Priority order: 1. If active_agent_id is set and the agent is still in the room -> sticky affinity (agent keeps handling) 2. Evaluate rules in priority order -> first match wins 3. Fall back to default_agent_id

as_hook

as_hook()

Return a BEFORE_BROADCAST sync hook function.

The hook stamps event.metadata with routing information that EventRouter uses to filter intelligence channels.

install

install(kit, agents, *, agent_aliases=None, phase_map=None, hook_priority=-100)

Wire routing and handoff in one call.

Registers this router as a BEFORE_BROADCAST sync hook, builds a HandoffHandler, and calls setup_handoff on every agent.

Returns the HandoffHandler for further customisation.

RoutingRule

Bases: BaseModel

A routing rule mapping conditions to an agent.

Rules are evaluated in ascending priority order (lower values first). The first matching rule wins. Use negative priorities to ensure a rule is evaluated before the default 0, or positive values to defer it.

priority class-attribute instance-attribute

priority = 0

Evaluation order — lower values are checked first (default 0).

RoutingConditions

Bases: BaseModel

Conditions for a routing rule to match.

All specified conditions are ANDed — every non-None field must match.

Conversation State

ConversationState

Bases: BaseModel

Tracks conversation progress within a room.

All fields are optional with sensible defaults so rooms without orchestration have zero overhead.

transition

transition(to_phase, to_agent=None, reason='', metadata=None)

Create a new state with a phase transition recorded.

Returns a new instance (immutable pattern) — the original is never modified.

ConversationPhase

Bases: StrEnum

Built-in conversation phases.

Users can use custom string values — routing and state do not restrict phases to this enum.

PhaseTransition

Bases: BaseModel

Audit record for a phase change.

get_conversation_state

get_conversation_state(room)

Extract typed ConversationState from room metadata.

Returns a fresh default state if no orchestration state exists.

set_conversation_state

set_conversation_state(room, state)

Return a room copy with updated conversation state.

Does NOT persist — the caller must save via store.update_room().

Pipeline

ConversationPipeline

ConversationPipeline(stages, default_phase=None, supervisor_id=None)

Generates routing rules for sequential agent workflows.

Example::

pipeline = ConversationPipeline(
    stages=[
        PipelineStage(phase="analysis", agent_id="agent-discuss", next="coding"),
        PipelineStage(phase="coding", agent_id="agent-coder", next="review"),
        PipelineStage(phase="review", agent_id="agent-reviewer",
                      next="report", can_return_to={"coding"}),
        PipelineStage(phase="report", agent_id="agent-writer", next=None),
    ],
    supervisor_id="agent-supervisor",
)
router = pipeline.to_router()

stages property

stages

The pipeline stages.

to_router

to_router()

Generate a ConversationRouter from this pipeline.

get_phase_map

get_phase_map()

Return agent_id -> default phase mapping for HandoffHandler.

get_allowed_transitions

get_allowed_transitions()

Return phase -> allowed next phases for validation.

install

install(kit, agents, *, agent_aliases=None, hook_priority=-100, greet_on_handoff=False, voice_channel_id=None, greeting_prompt=None, farewell_prompt=None)

Wire routing and handoff in one call.

Creates a router from this pipeline, registers it as a BEFORE_BROADCAST sync hook, builds a HandoffHandler with the pipeline's phase map and transition constraints, and calls setup_handoff on every agent.

When greet_on_handoff is True, two extra hooks are registered:

  • ON_HANDOFF (async): blocks the old agent's farewell via a BEFORE_TTS flag, then sends a synthetic inbound message on voice_channel_id to prompt the new agent to greet.
  • BEFORE_TTS (sync): blocks TTS while a handoff is pending.

Returns (router, handler) for further customisation.

PipelineStage

Bases: BaseModel

A stage in a conversation pipeline.

Handoff

HANDOFF_TOOL module-attribute

HANDOFF_TOOL = AITool(name='handoff_conversation', description="Transfer this conversation to another agent or specialist. Use when: the user asks to speak with someone else, the user wants to be transferred or go back to a previous agent, the conversation needs expertise you don't have, or your task is complete and the next step requires a different agent. Always provide a clear summary of the conversation so far.", parameters={'type': 'object', 'properties': {'target': {'type': 'string', 'description': "Target agent ID or alias (e.g., 'agent-advisor', 'agent-coder', 'human')"}, 'reason': {'type': 'string', 'description': 'Why the handoff is needed'}, 'summary': {'type': 'string', 'description': 'Summary of the conversation so far and what the next agent needs to know to continue effectively'}, 'next_phase': {'type': 'string', 'description': "Optional: conversation phase to transition to (e.g., 'handling', 'review', 'escalation')"}, 'channel_escalation': {'type': 'string', 'enum': ['same', 'voice', 'email', 'sms'], 'description': "Whether to escalate to a different channel. 'same' keeps the current channel."}}, 'required': ['target', 'reason', 'summary']})

build_handoff_tool

build_handoff_tool(targets)

Build a handoff tool with constrained target enum.

Parameters:

Name Type Description Default
targets list[tuple[str, str | None]]

List of (agent_id, description_or_none) pairs. When empty, returns the generic :data:HANDOFF_TOOL.

required

Returns:

Name Type Description
An AITool

class:AITool whose target parameter has an enum

AITool

restricting the AI to only the listed agent IDs.

HandoffHandler

HandoffHandler(kit, router, agent_aliases=None, phase_map=None, allowed_transitions=None, known_agents=None, on_handoff_complete=None, event_channel_id=None)

Processes handoff tool calls.

Intercepts the handoff_conversation tool, updates conversation state, emits a system event, and returns the result to the agent.

greeting_map property writable

greeting_map

Agent ID to greeting text mapping.

agents property writable

agents

Agent ID to Agent instance mapping.

on_handoff_complete property writable

on_handoff_complete

Callback invoked after a successful handoff.

get_room_language

get_room_language(room, agent_id)

Get effective language: room override > agent default.

handle async

handle(room_id, calling_agent_id, arguments)

Process a handoff request.

Steps: 1. Resolve target agent (alias -> channel_id) 2. Validate target exists in room 3. Update ConversationState 4. Persist state to room metadata 5. Emit system event in room 6. Return result to calling agent

send_greeting async

send_greeting(room_id, *, channel_id=None)

Send the initial agent greeting for a room.

Looks up the current active agent from conversation state and sends its greeting. For :class:RealtimeVoiceChannel, injects text directly into the provider session. For traditional voice, sends a synthetic inbound message to trigger an AI response.

Call this after setting the initial conversation state::

room = set_conversation_state(room, ConversationState(...))
await kit.store.update_room(room)
await handler.send_greeting(room_id, channel_id="voice")

Does nothing when the active agent has no greeting configured.

set_language async

set_language(room_id, language, *, channel_id=None)

Change the conversation language for a room.

Stores the language in conversation state and, for realtime sessions, reconfigures the active agent's session with an updated system prompt that includes the language instruction.

Parameters:

Name Type Description Default
room_id str

The room to update.

required
language str

Language name or code (e.g. "French", "fr").

required
channel_id str | None

Voice channel ID. Falls back to the event channel configured by install().

None

HandoffRequest

Bases: BaseModel

Parsed from agent's handoff tool call arguments.

HandoffResult

Bases: BaseModel

Result returned to the calling agent.

HandoffMemoryProvider

HandoffMemoryProvider(inner)

Bases: MemoryProvider

Injects handoff context (summary, reason) into agent prompts.

Wraps an inner MemoryProvider and prepends handoff information when the conversation state indicates a recent handoff.

setup_handoff

setup_handoff(channel, handler, *, tool=None)

Wire handoff into an AIChannel's tool chain.

  • Injects the handoff tool into the channel's tool definitions
  • Wraps the tool handler to intercept handoff_conversation calls
  • Uses _room_id_var ContextVar for room_id (set by routing hook)

Parameters:

Name Type Description Default
channel AIChannel

The AI channel to wire handoff into.

required
handler HandoffHandler

The handoff handler that processes tool calls.

required
tool AITool | None

Optional custom handoff tool (e.g. from :func:build_handoff_tool). Defaults to the generic :data:HANDOFF_TOOL.

None