Multi-Agent Orchestration¶
Route conversations between multiple AI agents with state tracking, rule-based routing, handoff protocol, and pipeline workflows. Agents can transfer conversations to each other while preserving context, and a supervisor can observe all exchanges.
Quick start¶
from roomkit import (
AIChannel,
ConversationPipeline,
HandoffMemoryProvider,
PipelineStage,
RoomKit,
SlidingWindowMemory,
)
# 1. Define the pipeline
pipeline = ConversationPipeline(
stages=[
PipelineStage(phase="intake", agent_id="agent-triage", next="handling"),
PipelineStage(phase="handling", agent_id="agent-handler", next="review"),
PipelineStage(
phase="review",
agent_id="agent-reviewer",
next="resolution",
can_return_to={"handling"},
),
PipelineStage(phase="resolution", agent_id="agent-resolver", next=None),
],
supervisor_id="agent-supervisor",
)
# 2. Create the framework and register channels
kit = RoomKit()
memory = HandoffMemoryProvider(SlidingWindowMemory(max_events=50))
ai_triage = AIChannel("agent-triage", provider=provider, system_prompt="You triage.", memory=memory)
ai_handler = AIChannel("agent-handler", provider=provider, system_prompt="You handle.", memory=memory)
# ... register all channels
# 3. One-liner: routing hook + handoff wiring on all agents
router, handler = pipeline.install(kit, [ai_triage, ai_handler])
How it works¶
Orchestration has four layers that work together:
Inbound event
-> ConversationRouter (BEFORE_BROADCAST hook, priority -100)
-> Reads ConversationState from Room.metadata
-> Selects agent via: affinity -> rules -> default
-> Stamps _routed_to + _always_process on event metadata
-> EventRouter._process_target()
-> Checks _routed_to for INTELLIGENCE channels
-> Skips non-targeted agents (supervisor always processes)
-> Active agent generates response
-> May call handoff_conversation tool
-> HandoffHandler processes handoff
-> Updates ConversationState
-> Persists to Room.metadata
-> Emits system event
-> Next inbound routes to new agent
ConversationState¶
Tracks conversation progress within a room. Stored in Room.metadata["_conversation_state"] and round-trips through JSON cleanly.
from roomkit import ConversationState, get_conversation_state, set_conversation_state
# Read state from a room
state = get_conversation_state(room)
print(state.phase) # "intake"
print(state.active_agent_id) # "agent-triage" or None
print(state.handoff_count) # 0
# Transition to a new phase (immutable — returns a new instance)
new_state = state.transition(
to_phase="handling",
to_agent="agent-handler",
reason="User request classified as billing issue",
)
# Persist (caller must save via store)
updated_room = set_conversation_state(room, new_state)
await kit.store.update_room(updated_room)
ConversationPhase¶
Six built-in phase names are provided as a StrEnum. You can use any string as a phase name — routing and state do not restrict phases to this enum.
| Phase | Value |
|---|---|
INTAKE |
"intake" |
QUALIFICATION |
"qualification" |
HANDLING |
"handling" |
ESCALATION |
"escalation" |
RESOLUTION |
"resolution" |
FOLLOWUP |
"followup" |
PhaseTransition¶
Every call to state.transition() appends a PhaseTransition audit record to state.phase_history:
for t in state.phase_history:
print(f"{t.from_phase} -> {t.to_phase} by {t.from_agent} ({t.reason})")
ConversationRouter¶
Routes events to agents using a three-tier selection strategy:
- Agent affinity — If
active_agent_idis set and the agent is still in the room, it keeps handling - Rule matching — Evaluate
RoutingRuleobjects in priority order; first match wins - Default fallback — Fall back to
default_agent_id
Events from intelligence channels are never routed (prevents loops).
RoutingRule and RoutingConditions¶
from roomkit import ConversationRouter, RoutingRule, RoutingConditions, ChannelType
router = ConversationRouter(
rules=[
RoutingRule(
agent_id="agent-billing",
conditions=RoutingConditions(
phases={"handling"},
intents={"billing"},
),
priority=0,
),
RoutingRule(
agent_id="agent-support",
conditions=RoutingConditions(
phases={"handling"},
channel_types={ChannelType.SMS},
),
priority=10,
),
],
default_agent_id="agent-triage",
supervisor_id="agent-supervisor",
)
All conditions within a rule are ANDed. Available condition fields:
| Field | Type | Description |
|---|---|---|
phases |
set[str] |
Match when conversation is in one of these phases |
channel_types |
set[ChannelType] |
Match when event source is one of these channel types |
intents |
set[str] |
Match when event.metadata["intent"] is in this set |
source_channel_ids |
set[str] |
Match when event comes from one of these channels |
custom |
Callable |
Custom predicate (event, context, state) -> bool |
Supervisor¶
The supervisor_id agent always receives events regardless of routing. Use this for oversight, logging, or intervention.
One-liner setup¶
Use router.install() to register the hook and wire handoff on all agents in one call:
handler = router.install(
kit,
[ai_triage, ai_billing, ai_tech],
agent_aliases={"billing": "agent-billing"},
phase_map={"agent-billing": "handling"},
)
Manual setup¶
For full control, register the hook and handoff separately:
kit.hook(HookTrigger.BEFORE_BROADCAST, execution=HookExecution.SYNC, priority=-100)(
router.as_hook()
)
The hook runs at priority -100 (before user hooks) and stamps _routed_to and _always_process on event metadata. The EventRouter reads these fields to filter intelligence channels.
Handoff Protocol¶
Agents trigger handoffs by calling the handoff_conversation tool. The framework intercepts the call, validates the target, updates state, and emits a system event.
HandoffHandler¶
from roomkit import HandoffHandler
handler = HandoffHandler(
kit=kit,
router=router,
agent_aliases={"billing": "agent-billing", "human": "human"},
phase_map={"agent-billing": "handling", "agent-resolver": "resolution"},
allowed_transitions=pipeline.get_allowed_transitions(), # enforce pipeline topology
)
| Parameter | Description |
|---|---|
kit |
The RoomKit instance (for room access and event emission) |
router |
The ConversationRouter (for rule validation) |
agent_aliases |
Map friendly names to channel IDs (e.g., "billing" -> "agent-billing") |
phase_map |
Map agent IDs to default phases (used when next_phase not specified) |
allowed_transitions |
Optional dict[str, set[str]] from pipeline.get_allowed_transitions(). When set, handoffs to disallowed phases are rejected. |
setup_handoff¶
Wires the handoff tool into an AIChannel:
This does two things:
- Injects
HANDOFF_TOOLinto the channel's tool definitions - Wraps the tool handler to intercept
handoff_conversationcalls
The handoff tool definition tells the AI when and how to transfer:
{
"name": "handoff_conversation",
"parameters": {
"required": ["target", "reason", "summary"],
"properties": {
"target": "Target agent ID or alias",
"reason": "Why the handoff is needed",
"summary": "Context for the next agent",
"next_phase": "Optional phase to transition to",
"channel_escalation": "same | voice | email | sms"
}
}
}
Human escalation¶
The special target "human" sets active_agent_id to None, allowing all agents to process events (or none, depending on your rules). This is the escape hatch for human-in-the-loop workflows.
HandoffMemoryProvider¶
Wraps an inner MemoryProvider to inject handoff context when a conversation has been transferred:
from roomkit import HandoffMemoryProvider, SlidingWindowMemory
memory = HandoffMemoryProvider(SlidingWindowMemory(max_events=50))
ai = AIChannel("agent-handler", provider=provider, memory=memory)
After a handoff, the receiving agent sees a prepended message like:
[Context from previous agent (agent-triage)]: User needs help with billing. Account #12345, premium plan, last payment was 30 days ago.
ConversationPipeline¶
Syntactic sugar for defining sequential multi-agent workflows. Generates a ConversationRouter from a list of pipeline stages.
from roomkit import ConversationPipeline, PipelineStage
pipeline = ConversationPipeline(
stages=[
PipelineStage(phase="analysis", agent_id="agent-discuss", next="coding"),
PipelineStage(phase="coding", agent_id="agent-coder", next="review"),
PipelineStage(
phase="review",
agent_id="agent-reviewer",
next="report",
can_return_to={"coding"}, # Reviewer can send back to coder
),
PipelineStage(phase="report", agent_id="agent-writer", next=None),
],
supervisor_id="agent-supervisor",
)
router = pipeline.to_router()
One-liner setup¶
Use pipeline.install() to generate the router, register the hook, and wire handoff on all agents:
You can pass agent_aliases and hook_priority as keyword arguments. The handler is created with phase_map and allowed_transitions derived from the pipeline stages automatically.
PipelineStage fields¶
| Field | Type | Description |
|---|---|---|
phase |
str |
Phase name for this stage |
agent_id |
str |
Agent channel ID that handles this phase |
next |
str \| None |
Phase to transition to after this stage |
can_return_to |
set[str] |
Additional phases this stage can transition back to |
Pipeline utilities¶
# agent_id -> default phase mapping (for HandoffHandler.phase_map)
pipeline.get_phase_map()
# {"agent-discuss": "analysis", "agent-coder": "coding", ...}
# phase -> allowed next phases (for validation)
pipeline.get_allowed_transitions()
# {"analysis": {"coding"}, "coding": {"review"}, "review": {"report", "coding"}, ...}
Validation¶
The pipeline validates its graph at construction:
nextmust reference an existing phasecan_return_toentries must reference existing phases- Self-referencing (
next="self_phase") is allowed for loops
Hook triggers¶
Three orchestration-specific hook triggers are available:
| Trigger | Description |
|---|---|
ON_PHASE_TRANSITION |
Fired when the conversation phase changes |
ON_HANDOFF |
Fired when a handoff is accepted |
ON_HANDOFF_REJECTED |
Fired when a handoff is rejected (target not found) |
Example¶
See examples/orchestration_pipeline.py for a runnable demo showing a multi-agent pipeline with handoff between triage, handler, and resolver agents.