Agent Delegation¶
Delegate tasks to background agents while conversations continue. A voice agent can hand off a PR review to a specialist while still chatting with the user — one call replaces ~100 lines of manual boilerplate.
Quick start¶
from roomkit import RoomKit, ChannelCategory
from roomkit.channels.agent import Agent
from roomkit.channels.ai import AIChannel
kit = RoomKit()
# Register a front-facing agent and a background specialist
voice_agent = AIChannel("voice-assistant", provider=my_ai, system_prompt="...")
pr_reviewer = Agent("pr-reviewer", provider=reviewer_ai, role="PR Reviewer")
kit.register_channel(voice_agent)
kit.register_channel(pr_reviewer)
# Set up a room
await kit.create_room(room_id="call-room")
await kit.attach_channel("call-room", "voice-assistant", category=ChannelCategory.INTELLIGENCE)
# Delegate — one call does everything
task = await kit.delegate(
room_id="call-room",
agent_id="pr-reviewer",
task="Review the latest PR on roomkit",
notify="voice-assistant",
)
# Fire and forget, or block for the result
result = await task.wait(timeout=30.0)
print(result.output)
How it works¶
kit.delegate() creates a child room linked to the parent:
Parent room (call-room)
├── voice-call (transport)
├── voice-assistant (intelligence) ← notified when done
└── email-out (transport) ← shared to child
Child room (call-room::task-a1b2c3d4)
├── pr-reviewer (intelligence) ← runs the task
└── email-out (transport) ← shared from parent
The flow:
- Child room created with parent link in metadata
- Agent attached as
INTELLIGENCEin the child room - Channels shared from parent (same provider instance, different binding)
- Task event injected into child room → agent picks it up
- Agent response collected as the task result
- Parent notified via system prompt injection on the
notifychannel - Hooks fired:
ON_TASK_DELEGATED(immediately) andON_TASK_COMPLETED(on finish)
Fire and forget¶
task = await kit.delegate(
room_id="call-room",
agent_id="pr-reviewer",
task="Review the latest PR",
)
# Returns immediately — task runs in the background
Blocking for result¶
task = await kit.delegate(room_id="call-room", agent_id="pr-reviewer", task="...")
result = await task.wait(timeout=30.0)
if result.status == "completed":
print(result.output)
else:
print(f"Failed: {result.error}")
Parallel delegation¶
import asyncio
task_a = await kit.delegate(room_id="room-1", agent_id="reviewer", task="Review PR")
task_b = await kit.delegate(room_id="room-1", agent_id="analyst", task="Analyze metrics")
result_a, result_b = await asyncio.gather(
task_a.wait(timeout=30.0),
task_b.wait(timeout=30.0),
)
Tool integration¶
Let the AI decide when to delegate using setup_delegation():
from roomkit.tasks import DelegateHandler, setup_delegation, build_delegate_tool
handler = DelegateHandler(kit, notify="voice-assistant")
# Constrain which agents the AI can delegate to
tool = build_delegate_tool([
("pr-reviewer", "Reviews GitHub PRs"),
("code-writer", "Writes code from specs"),
])
setup_delegation(voice_agent, handler, tool=tool)
The AI will see a delegate_task tool and can call it naturally:
{
"name": "delegate_task",
"arguments": {
"agent": "pr-reviewer",
"task": "Review PR #42 and summarize findings",
"share_channels": ["email-out"]
}
}
Shared channels¶
Channels shared from the parent use the same provider instance with a different binding:
task = await kit.delegate(
room_id="call-room",
agent_id="pr-reviewer",
task="Review PR and email summary",
share_channels=["email-out"], # same EmailChannel, shared to child
)
The background agent can send emails through the shared channel just like the parent.
Hooks¶
Two hook triggers for observability:
@kit.hook(HookTrigger.ON_TASK_DELEGATED, execution=HookExecution.ASYNC)
async def on_delegated(event, ctx):
task_id = event.metadata["task_id"]
agent_id = event.metadata["agent_id"]
logger.info("Task %s delegated to %s", task_id, agent_id)
@kit.hook(HookTrigger.ON_TASK_COMPLETED, execution=HookExecution.ASYNC)
async def on_completed(event, ctx):
task_id = event.metadata["task_id"]
status = event.metadata["task_status"]
duration = event.metadata["duration_ms"]
logger.info("Task %s: %s in %.0fms", task_id, status, duration)
Callbacks¶
For programmatic handling beyond hooks:
async def handle_result(result):
if result.status == "completed":
await send_notification(result.output)
task = await kit.delegate(
room_id="call-room",
agent_id="pr-reviewer",
task="Review PR",
on_complete=handle_result,
)
Configuration reference¶
| Parameter | Type | Description |
|---|---|---|
room_id |
str |
Parent room ID (required) |
agent_id |
str |
Channel ID of the background agent (required) |
task |
str |
What the agent should do (required) |
context |
dict |
Optional context passed to the agent |
share_channels |
list[str] |
Channel IDs to share from parent |
notify |
str |
Channel ID to update with result (default: agent_id) |
on_complete |
callable |
Async callback (DelegatedTaskResult) -> None |
Custom task runner¶
The default InMemoryTaskRunner uses asyncio.create_task(). For distributed deployments, implement the TaskRunner ABC:
from roomkit.tasks import TaskRunner, DelegatedTask
class RedisTaskRunner(TaskRunner):
async def submit(self, kit, task, *, context=None, on_complete=None):
# Submit to Redis queue
...
async def cancel(self, task_id):
# Cancel via Redis
...
async def close(self):
# Shutdown
...
kit = RoomKit(task_runner=RedisTaskRunner(redis_url="..."))