An open-source Python framework that connects AI agents to voice, SMS, WhatsApp, email, and WebSocket in a single conversation.
pip install roomkit
from roomkit import RoomKit, Agent, Pipeline
from roomkit import WebSocketChannel, VoiceChannel
kit = RoomKit(
orchestration=Pipeline(agents=[triage, support]),
)
# Register channels
kit.register_channel(WebSocketChannel("web"))
kit.register_channel(VoiceChannel("voice", stt=stt, tts=tts))
# Create a room and attach channels
room = await kit.create_room()
await kit.attach_channel(room.id, "web")
await kit.attach_channel(room.id, "voice")
# Messages flow through hooks, get stored,
# and broadcast to all attached channels
Building conversation systems today is harder than it should be.
Separate integrations for SMS, Email, WhatsApp, chat widgets. Each with its own SDK, webhooks, and quirks.
Customer starts on SMS, continues on email, finishes on chat. Your system treats these as 3 strangers.
"What did they say last week?" requires querying 5 different APIs.
+1-555-1234 on SMS is john@example.com on email is "John D." on chat. Connecting these is your problem.
Switching from Twilio to Telnyx means rewriting everything.
Messages flow into rooms, not silos. Switch channels mid-conversation without losing context.
Swap providers without changing application logic. Twilio today, Telnyx tomorrow.
Resolve unknown senders, handle ambiguity with hooks, merge identities across channels.
Intercept, route, moderate, or transform messages at any point. One place for all your logic.
Query conversations, not channels. Full context regardless of how customers reached you.
Layered guardrails, tool policies, chain depth limits, and audit trails. Safety that composes with your logic, not a separate system.
A complete framework for building conversation systems at any scale.
Organize conversations into rooms with participants, events, and channel bindings. Each room is a self-contained conversation context.
SMS, Email, WhatsApp, Teams, Messenger, Voice, Video, WebSocket, AI, and more. Messages flow seamlessly between channels with automatic transcoding.
Built on Python's asyncio from the ground up. Handle thousands of concurrent conversations without blocking.
40+ hook triggers to intercept, modify, or block events at any point. Build content moderation, analytics, AI routing, and more with sync and async hooks.
Resolve unknown senders to known identities. Handle ambiguous cases with hooks for challenges, verification, or manual resolution.
In-memory defaults for development, plug in Redis, PostgreSQL, or custom implementations for production. Storage, locks, and realtime all pluggable.
5 voice backends (FastRTC, SIP, RTP, Twilio WebSocket, Local Audio). Full audio pipeline with AEC, AGC, Denoiser, VAD, DTMF, and Diarization. STT/TTS or speech-to-speech with Gemini Live, OpenAI Realtime, and ElevenLabs.
Connect persistent message sources like WebSocket, NATS, or SSE. Auto-restart with exponential backoff, health monitoring, and backpressure control built-in.
Circuit breakers isolate failing providers. Rate limiting with token buckets. Retry with exponential backoff. Chain depth limits prevent infinite loops.
Vision providers (OpenAI, Gemini), video pipeline with decode, resize, filter, and transform stages. Avatar synthesis with Anam AI. YOLO detection, screen capture, and recording (H.264/VP9).
Tool protocol objects bundle definition and handler. AI thinking with Claude and DeepSeek-R1. Planning tools, dangling tool recovery, large output eviction, and streaming tool loops.
5 memory strategies: sliding window, budget-aware, compacting, summarizing, and retrieval. Pluggable knowledge sources with PostgreSQL full-text search for context enrichment.
4 declarative strategies to wire agents into conversations. All work with live voice: audio stays connected through handoffs.
Linear agent chain. Triage classifies, handler resolves, closer wraps up.
A → B → C
Any agent can hand off to any other. No fixed order, pure flexibility.
Any ↔ Any
Supervisor delegates tasks to workers in isolated child rooms. Sequential or parallel.
Boss ↓ Workers
Generate, review, iterate. Single or multi-reviewer approval workflows.
Create ↻ Review
from roomkit import RoomKit, Agent, Pipeline
from roomkit.providers.anthropic.ai import AnthropicAIProvider
triage = Agent("triage", provider=ai,
role="Classify the customer's intent",
greeting="Hi! How can I help you today?")
handler = Agent("handler", provider=ai,
role="Resolve the customer's issue")
kit = RoomKit(orchestration=Pipeline(agents=[triage, handler]))
from roomkit import RoomKit, Agent, Swarm
sales = Agent("sales", provider=ai, role="Handle sales inquiries")
support = Agent("support", provider=ai, role="Technical support")
billing = Agent("billing", provider=ai, role="Billing questions")
# Any agent can hand off to any other
kit = RoomKit(
orchestration=Swarm(
agents=[sales, support, billing],
entry="sales",
)
)
from roomkit import RoomKit, Agent, Supervisor
coordinator = Agent("coordinator", provider=ai, role="Coordinate analysis")
technical = Agent("technical", provider=ai, role="Technical analysis")
business = Agent("business", provider=ai, role="Business analysis")
# Workers run in isolated child rooms
kit = RoomKit(
orchestration=Supervisor(
supervisor=coordinator,
workers=[technical, business],
strategy="parallel",
auto_delegate=True,
)
)
from roomkit import RoomKit, Agent, Loop
coder = Agent("coder", provider=ai, role="Write code solutions")
security = Agent("security", provider=ai, role="Security review")
perf = Agent("perf", provider=ai, role="Performance review")
# All reviewers must approve
kit = RoomKit(
orchestration=Loop(
agent=coder,
reviewers=[security, perf],
strategy="parallel",
)
)
Layered guardrails that compose with your logic. Each layer is independent, optional, and hook-driven.
BEFORE_BROADCAST hooks block, modify, or redact inbound content. Toxicity filtering, PII redaction, jailbreak detection.
ToolPolicy and ON_TOOL_CALL hooks enforce allow/deny lists, validate arguments, and redact sensitive results.
max_tool_rounds, tool_loop_timeout, and max_chain_depth prevent runaway AI loops and resource exhaustion.
Per-channel rate limits (token bucket), circuit breakers for fault isolation, and permission-based access control.
JSONLSessionAuditor records full conversation timelines. Quality scoring, user feedback, and ON_AI_RESPONSE hooks for observability.
Built-in support for popular communication channels with easy extensibility.
Clean, intuitive APIs that make complex operations simple.
from roomkit import RoomKit, HookTrigger, HookResult
kit = RoomKit()
# Content moderation hook
@kit.hook(HookTrigger.BEFORE_BROADCAST)
async def moderate_content(event, ctx):
if contains_profanity(event.content.body):
return HookResult.block("Content policy violation")
return HookResult.allow()
# AI routing hook
@kit.hook(HookTrigger.BEFORE_BROADCAST)
async def route_to_ai(event, ctx):
if needs_ai_response(event, ctx):
return HookResult.inject_to(["ai-channel"])
return HookResult.allow()
from roomkit import RoomKit, AIChannel, Tool
class GetWeatherTool(Tool):
@property
def definition(self) -> dict:
return {
"name": "get_weather",
"description": "Get current weather",
"parameters": {"type": "object", "properties": {
"city": {"type": "string"}
}},
}
async def handler(self, name, arguments):
return fetch_weather(arguments["city"])
# Pass Tool objects directly — definition + handler in one class
ai = AIChannel("ai", provider=my_ai, tools=[GetWeatherTool()])
from roomkit import RoomKit, VoiceChannel
from roomkit.voice.stt.deepgram import DeepgramSTTProvider
from roomkit.voice.tts.elevenlabs import ElevenLabsTTSProvider
from roomkit.voice.backends.fastrtc import FastRTCBackend
# Configure voice channel with STT, TTS, and backend
kit = RoomKit()
kit.register_channel(VoiceChannel(
"voice",
stt=DeepgramSTTProvider(api_key="..."),
tts=ElevenLabsTTSProvider(api_key="...", voice_id="..."),
backend=FastRTCBackend(),
))
# Attach to a room — voice joins the same conversation
await kit.attach_channel(room.id, "voice")
# Transcriptions flow through hooks like any message
from roomkit import RoomKit, RealtimeVoiceChannel
from roomkit.providers.gemini.realtime import GeminiLiveProvider
# Speech-to-speech AI: no STT/TTS pipeline needed
kit = RoomKit()
kit.register_channel(RealtimeVoiceChannel(
"realtime",
provider=GeminiLiveProvider(api_key="..."),
system_prompt="You are a helpful voice assistant.",
tools=[GetWeatherTool()],
))
# Join a room: audio flows directly to/from AI
await kit.join(room.id, "realtime", websocket)
# Transcriptions appear as RoomEvents in the room
# Text from other channels is injected into the session
from roomkit import RoomKit, VideoChannel
from roomkit.video.vision.openai import OpenAIVisionProvider
from roomkit.video.backends.fastrtc import FastRTCVideoBackend
kit = RoomKit()
kit.register_channel(VideoChannel(
"video",
backend=FastRTCVideoBackend(),
vision=OpenAIVisionProvider(api_key="..."),
))
# Vision AI analyzes frames on demand
# Filters: YOLO detection, watermark, censor
# Recording: H.264/VP9 with room-level A/V sync
await kit.attach_channel(room.id, "video")
from roomkit import RoomKit, Agent
from roomkit.memory import RetrievalMemory
from roomkit.knowledge.postgres import PostgresKnowledgeSource
# Knowledge-enriched context from pluggable sources
memory = RetrievalMemory(
sources=[PostgresKnowledgeSource(pool=db_pool)],
max_results=5,
)
agent = Agent(
"support", provider=ai,
role="Customer support agent",
memory=memory,
)
# Agent context is automatically enriched with
# relevant knowledge from your database
from roomkit import RoomKit, BaseSourceProvider, SourceStatus
class NATSSource(BaseSourceProvider):
def __init__(self, subject: str):
super().__init__()
self.subject = subject
@property
def name(self) -> str:
return f"nats:{self.subject}"
async def start(self, emit):
self._set_status(SourceStatus.CONNECTED)
async for msg in self.subscribe():
await emit(parse_message(msg))
self._record_message()
# Attach with resilience options
await kit.attach_source(
"nats-events", NATSSource("chat.>"),
max_restart_attempts=10, # Give up after 10 failures
max_concurrent_emits=20, # Backpressure control
)
Define multi-step workflows as serializable directed graphs and run them inside RoomKit rooms. AI agents, human decisions, parallel execution, conditional branching — composed as data, not code.
from roomkit_graph import (
Graph, Node, Edge, Condition,
WorkflowEngine, WebhookTrigger,
)
graph = Graph(
id="bug-triage",
trigger=WebhookTrigger(source_type="github"),
)
graph.add_nodes(
Node("start", type="start"),
Node("triage", type="agent", config={
"agent_id": "triage-agent",
}),
Node("escalate", type="notification"),
Node("assign", type="agent"),
Node("end", type="end"),
)
graph.add_edges(
Edge("start", "triage"),
Edge("triage", "escalate",
condition=Condition.field(
"triage.output.severity"
).equals("critical")),
Edge("triage", "assign",
condition=Condition.otherwise()),
Edge("escalate", "end"),
Edge("assign", "end"),
)
# Run to completion
ctx = await WorkflowEngine(graph).run()
from roomkit import Agent
from roomkit_sandbox import ContainerSandboxExecutor
from roomkit_sandbox.docker_backend import (
DockerSandboxBackend,
)
agent = Agent(
name="code-reviewer",
provider=anthropic.Anthropic(),
sandbox=ContainerSandboxExecutor(
backend=DockerSandboxBackend(
image="ghcr.io/roomkit-live/sandbox:latest",
memory_limit="512m",
),
session_id="review-session",
setup_commands=[
"git clone https://github.com/org/repo.git /workspace/repo",
],
),
)
# Agent now has sandbox_read, sandbox_write,
# sandbox_git, sandbox_bash, and 6 more tools.
Give AI agents isolated environments to read files, run git commands, and execute shell scripts. Three backends for three isolation levels: containers, pods, or VMs.
Get started with RoomKit in minutes. Check out the documentation for guides, examples, and API reference.