The missing layer between your channels and your logic. Primitives for multi-channel conversations — not a platform, a foundation.
pip install roomkit
from roomkit import RoomKit, WebSocketChannel, ChannelCategory
from roomkit.channels.ai import AIChannel
kit = RoomKit()
# Register channels
kit.register_channel(WebSocketChannel("web"))
kit.register_channel(AIChannel("ai", provider=my_ai))
# Create a room and attach channels
room = await kit.create_room()
await kit.attach_channel(room.id, "web")
await kit.attach_channel(
room.id, "ai", category=ChannelCategory.INTELLIGENCE)
# Process inbound messages
result = await kit.process_inbound(message)
# Messages flow through hooks, get stored,
# and broadcast to all attached channels
Building conversation systems today is harder than it should be.
Separate integrations for SMS, Email, WhatsApp, chat widgets. Each with its own SDK, webhooks, and quirks.
Customer starts on SMS, continues on email, finishes on chat. Your system treats these as 3 strangers.
"What did they say last week?" requires querying 5 different APIs.
+1-555-1234 on SMS is john@example.com on email is "John D." on chat. Connecting these is your problem.
Switching from Twilio to Telnyx means rewriting everything.
Messages flow into rooms, not silos. Switch channels mid-conversation without losing context.
Swap providers without changing application logic. Twilio today, Telnyx tomorrow.
Resolve unknown senders, handle ambiguity with hooks, merge identities across channels.
Intercept, route, moderate, or transform messages at any point. One place for all your logic.
Query conversations, not channels. Full context regardless of how customers reached you.
There are many tools in this space. Here's where RoomKit fits.
Move bytes between services. No concept of conversations or participants.
Focus on NLP, intent detection, and response generation.
Complete applications with UI, dashboards, and hosted infrastructure.
Pipeline or session-based voice AI. Focus on audio processing, not conversations.
| RoomKit | Chatwoot | Twilio | Kombu | Rasa | |
|---|---|---|---|---|---|
| Open source | ✓ | ✓ | ✗ | ✓ | ✓ |
| Self-hosted | ✓ | ✓ | ✗ | ✓ | ✓ |
| Python library | ✓ | ✗ | ✓ | ✓ | ✓ |
| Multi-channel | ✓ | ✓ | ✓ | ✗ | ✗ |
| Room-based | ✓ | ✗ | ✗ | ✗ | ✗ |
| Identity resolution | ✓ | ~ | ✗ | ✗ | ✗ |
| Hook system | ✓ | ✗ | ✗ | ✗ | ✗ |
| Async-first | ✓ | N/A | ✓ | ✓ | ✗ |
| No per-message fees | ✓ | ✓ | ✗ | ✓ | ✓ |
| Real-time voice | ✓ | ✗ | ✓ | ✗ | ✗ |
| AI-ready (llms.txt) | ✓ | ✗ | ✗ | ✗ | ✗ |
| Multi-agent orchestration | ✓ | ✗ | ✗ | ✗ | ✗ |
A complete framework for building conversation systems at any scale.
Organize conversations into rooms with participants, events, and channel bindings. Each room is a self-contained conversation context.
SMS, Email, WhatsApp, Teams, Messenger, Voice, WebSocket, AI, and more. Messages flow seamlessly between channels with automatic transcoding.
Built on Python's asyncio from the ground up. Handle thousands of concurrent conversations without blocking.
35+ hook triggers to intercept, modify, or block events at any point. Build content moderation, analytics, AI routing, and more with sync and async hooks.
Resolve unknown senders to known identities. Handle ambiguous cases with hooks for challenges, verification, or manual resolution.
In-memory defaults for development, plug in Redis, PostgreSQL, or custom implementations for production. Storage, locks, and realtime all pluggable.
4 voice backends (FastRTC, SIP, RTP, Local Audio). Full audio pipeline with AEC, AGC, Denoiser, VAD, DTMF, and Diarization. 4 interruption strategies. STT/TTS or speech-to-speech modes.
Connect persistent message sources like WebSocket, NATS, or SSE. Auto-restart with exponential backoff, health monitoring, and backpressure control built-in.
Circuit breakers isolate failing providers. Rate limiting with token buckets. Retry with exponential backoff. Chain depth limits prevent infinite loops.
Define agents, wire them into pipelines, and hand off conversations — including on live voice calls.
Inbound → ConversationRouter → Active Agent → Handoff → Next Agent
Extends AIChannel with role, voice, greeting, language, and memory. Each agent is a self-contained persona.
Staged workflows that move conversations through phases — triage, handling, resolution — automatically.
Context-preserving handoffs with audit trail. Voice handoffs without disconnect — the caller never knows.
Phase tracking, transition history, and custom context. Know exactly where every conversation stands.
from roomkit.orchestration import ConversationPipeline, PipelineStage
pipeline = ConversationPipeline(stages=[
PipelineStage(phase="triage", agent_id="agent-triage", next="handling"),
PipelineStage(phase="handling", agent_id="agent-handler", next=None),
])
router, handler = pipeline.install(kit, [triage, handler])
Built-in support for popular communication channels with easy extensibility.
Clean, intuitive APIs that make complex operations simple.
from roomkit import RoomKit, HookTrigger, HookResult
kit = RoomKit()
# Content moderation hook
@kit.hook(HookTrigger.BEFORE_BROADCAST)
async def moderate_content(event, ctx):
if contains_profanity(event.content.body):
return HookResult.block("Content policy violation")
return HookResult.allow()
# AI routing hook
@kit.hook(HookTrigger.BEFORE_BROADCAST)
async def route_to_ai(event, ctx):
if needs_ai_response(event, ctx):
return HookResult.inject_to(["ai-channel"])
return HookResult.allow()
from roomkit import RoomKit, HookTrigger, IdentityHookResult
kit = RoomKit(identity_resolver=my_resolver)
# Handle ambiguous identity (multiple matches)
@kit.identity_hook(HookTrigger.ON_IDENTITY_AMBIGUOUS)
async def resolve_ambiguous(event, ctx, id_result):
# Access sender info directly
sender = id_result.address # e.g., "+14185551234"
if sender in known_senders:
identity = get_identity(known_senders[sender])
return IdentityHookResult.resolved(identity)
# Ask user to identify themselves
return IdentityHookResult.pending(
display_name=f"Unknown ({sender})",
candidates=id_result.candidates
)
from roomkit import RoomKit, EphemeralEventType
kit = RoomKit()
# Subscribe to typing indicators and presence
async def on_realtime(event):
if event.type == EphemeralEventType.TYPING_START:
print(f"{event.user_id} is typing...")
elif event.type == EphemeralEventType.PRESENCE_ONLINE:
print(f"{event.user_id} came online")
sub_id = await kit.subscribe_room("room-123", on_realtime)
# Publish typing indicator
await kit.publish_typing("room-123", "user-456")
# Publish read receipt
await kit.publish_read_receipt("room-123", "user-456", "event-789")
from roomkit import RoomKit, VoiceChannel
from roomkit.voice.stt.deepgram import DeepgramSTTProvider
from roomkit.voice.tts.elevenlabs import ElevenLabsTTSProvider
from roomkit.voice.backends.fastrtc import FastRTCBackend
# Configure voice channel with STT, TTS, and backend
kit = RoomKit()
kit.register_channel(VoiceChannel(
"voice",
stt=DeepgramSTTProvider(api_key="..."),
tts=ElevenLabsTTSProvider(api_key="...", voice_id="..."),
backend=FastRTCBackend(),
))
# Attach to a room — voice joins the same conversation
await kit.attach_channel(room.id, "voice")
# Transcriptions flow through hooks like any message
from roomkit import RoomKit, RealtimeVoiceChannel
from roomkit.providers.gemini.realtime import GeminiLiveProvider
from roomkit.voice.realtime.ws_transport import WebSocketRealtimeTransport
# Speech-to-speech AI — no STT/TTS pipeline needed
provider = GeminiLiveProvider(api_key="...")
transport = WebSocketRealtimeTransport()
kit = RoomKit()
kit.register_channel(RealtimeVoiceChannel(
"realtime-voice",
provider=provider,
transport=transport,
system_prompt="You are a helpful voice assistant.",
))
# Connect a participant — audio flows directly to/from Gemini
session = await channel.start_session("room-1", "user-1", websocket)
# Transcriptions appear as RoomEvents in the room
# Text from other channels is injected into the AI session
from roomkit import RoomKit
from roomkit.channels.ai import AIChannel
from roomkit.orchestration import (
Agent, ConversationPipeline, PipelineStage
)
kit = RoomKit()
# Define agents with distinct roles
triage = Agent(
"triage", provider=my_ai,
role="Classify the customer's intent",
greeting="Hi! How can I help you today?",
)
handler = Agent(
"handler", provider=my_ai,
role="Resolve the customer's issue",
)
# Wire into a pipeline
pipeline = ConversationPipeline(stages=[
PipelineStage(phase="triage", agent_id="triage", next="handling"),
PipelineStage(phase="handling", agent_id="handler", next=None),
])
# Install — router and handler are wired automatically
router, on_handoff = pipeline.install(kit, [triage, handler])
from roomkit import RoomKit, BaseSourceProvider, SourceStatus
class NATSSource(BaseSourceProvider):
def __init__(self, subject: str):
super().__init__()
self.subject = subject
@property
def name(self) -> str:
return f"nats:{self.subject}"
async def start(self, emit):
self._set_status(SourceStatus.CONNECTED)
async for msg in self.subscribe():
await emit(parse_message(msg))
self._record_message()
# Attach with resilience options
await kit.attach_source(
"nats-events", NATSSource("chat.>"),
max_restart_attempts=10, # Give up after 10 failures
max_concurrent_emits=20, # Backpressure control
)
RoomKit includes llms.txt and AGENTS.md files that help AI coding assistants understand your codebase and generate correct, idiomatic code.
get_llms_txt()
from roomkit import get_llms_txt, get_agents_md
# Get documentation for LLM context
llms_content = get_llms_txt()
# Get coding guidelines for AI assistants
agents_content = get_agents_md()
# Use in your MCP server or AI tool
context = {
"documentation": llms_content,
"guidelines": agents_content
}
RoomKit is designed for seamless integration with the Model Context Protocol (MCP). Build AI assistants that can manage conversations, send messages, and handle multi-channel communication through a standardized protocol.
Get started with RoomKit in minutes. Check out the documentation for guides, examples, and API reference.