Features¶
Core Features¶
Multi-Channel Conversation Rooms¶
RoomKit provides a room-based abstraction for managing conversations across multiple communication channels simultaneously. A room is a shared conversation context where messages flow between all attached channels.
kit = RoomKit()
# Register channels
kit.register_channel(WebSocketChannel("ws-user"))
kit.register_channel(SMSChannel("sms-user", provider=sms_provider))
kit.register_channel(AIChannel("ai-bot", provider=ai_provider))
# Create a room and attach channels
room = await kit.create_room(room_id="support-123")
await kit.attach_channel("support-123", "ws-user")
await kit.attach_channel("support-123", "sms-user",
metadata={"phone_number": "+15551234567"})
await kit.attach_channel("support-123", "ai-bot", category=ChannelCategory.INTELLIGENCE)
When a message arrives on any channel, it is automatically broadcast to all other attached channels, with content transcoded as needed for each target's capabilities.
Room Lifecycle Management¶
Rooms follow a state machine with four statuses:
stateDiagram-v2
[*] --> ACTIVE: create_room()
ACTIVE --> PAUSED: pause (timer or manual)
PAUSED --> ACTIVE: (resume on activity)
ACTIVE --> CLOSED: close_room() or timer
PAUSED --> CLOSED: close_room() or timer
CLOSED --> ARCHIVED: (archive)
- ACTIVE -- Messages flow normally between all attached channels
- PAUSED -- Room is temporarily suspended; closed timer continues
- CLOSED -- Conversation is ended; no new messages accepted
- ARCHIVED -- Final state for long-term storage
Timer-based automation:
from roomkit import RoomTimers
await kit.create_room(
room_id="support-123",
metadata={"timers": RoomTimers(
inactive_after_seconds=300, # Auto-pause after 5min inactivity
closed_after_seconds=3600, # Auto-close after 1hr inactivity
)},
)
# Check and apply timer transitions for all active/paused rooms
transitioned = await kit.check_all_timers()
Event Pipeline¶
Every message passes through a deterministic processing pipeline:
- Inbound routing -- Resolve which room the message belongs to (by channel binding or participant)
- Auto-create -- If no room found, create a new room and attach the channel
- Channel conversion --
handle_inbound()converts the raw message to aRoomEvent - Identity resolution -- Identify the sender (optional, with timeout)
- Room lock -- Acquire per-room lock for atomic processing
- Idempotency check -- Reject duplicate messages by
idempotency_key - Sync hooks -- Content filtering, modification, or blocking (BEFORE_BROADCAST)
- Event storage -- Persist the event to the conversation store
- Broadcast -- Deliver to all eligible channels via the EventRouter
- Reentry drain -- Process AI response events in a loop (bounded by
max_chain_depth) - Side effects -- Persist tasks and observations
- Async hooks -- Side effects, logging, analytics (AFTER_BROADCAST)
- Activity update -- Update room timestamp and latest event index
Hook System¶
Hooks intercept events at specific points in the pipeline for business logic injection.
Sync hooks run before broadcast and can block, allow, or modify events:
@kit.hook(HookTrigger.BEFORE_BROADCAST, name="profanity_filter")
async def profanity_filter(event: RoomEvent, ctx: RoomContext) -> HookResult:
text = Channel.extract_text(event)
if contains_profanity(text):
return HookResult.block("Message contains inappropriate language")
return HookResult.allow()
Async hooks run after broadcast for side effects:
@kit.hook(HookTrigger.AFTER_BROADCAST, execution=HookExecution.ASYNC, name="logger")
async def log_event(event: RoomEvent, ctx: RoomContext) -> None:
await analytics.track("message_sent", {"room": event.room_id})
Hook features:
- Priority ordering -- Hooks execute in priority order (lower numbers first)
- Per-room hooks -- Attach hooks to specific rooms dynamically via add_room_hook()
- Global hooks -- Apply to all rooms via the @kit.hook() decorator
- Timeout protection -- Configurable timeout per hook (default 30s)
- Error isolation -- Hook failures are logged and collected as hook_errors but don't crash the pipeline
- Event injection -- Hooks can inject synthetic events via HookResult.injected_events
- Task/observation creation -- Hooks can create side-effect tasks and observations
- Event filtering -- Hooks can be filtered by channel type, channel ID, and direction
Hook filtering allows hooks to run only for specific event types:
from roomkit import ChannelType, ChannelDirection
# Only run for inbound SMS/MMS events
@kit.hook(
HookTrigger.BEFORE_BROADCAST,
name="rehost_media",
channel_types={ChannelType.SMS, ChannelType.MMS},
directions={ChannelDirection.INBOUND},
)
async def rehost_media(event: RoomEvent, ctx: RoomContext) -> HookResult:
# Only called for inbound SMS/MMS — no need to check inside
...
return HookResult.allow()
# Only run for a specific channel
@kit.hook(
HookTrigger.BEFORE_BROADCAST,
name="voicemeup_specific",
channel_ids={"sms-voicemeup"},
)
async def voicemeup_hook(event: RoomEvent, ctx: RoomContext) -> HookResult:
...
Filter options:
- channel_types: set[ChannelType] -- Only run for these channel types (e.g., {ChannelType.SMS})
- channel_ids: set[str] -- Only run for these channel IDs (e.g., {"sms-voicemeup"})
- directions: set[ChannelDirection] -- Only run for these directions (e.g., {ChannelDirection.INBOUND})
Hook triggers:
| Trigger | Execution | Use Case |
|---|---|---|
BEFORE_BROADCAST |
Sync | Content filtering, modification, blocking |
AFTER_BROADCAST |
Async | Logging, analytics, notifications |
ON_ROOM_CREATED |
Async | Room initialization |
ON_ROOM_PAUSED |
Async | Inactivity alerts |
ON_ROOM_CLOSED |
Async | Cleanup, archival |
ON_CHANNEL_ATTACHED |
Async | Welcome messages |
ON_CHANNEL_DETACHED |
Async | Farewell messages |
ON_CHANNEL_MUTED |
Async | State tracking |
ON_CHANNEL_UNMUTED |
Async | State tracking |
ON_IDENTITY_AMBIGUOUS |
Both | Multi-candidate disambiguation |
ON_IDENTITY_UNKNOWN |
Both | Unknown sender handling |
ON_PARTICIPANT_IDENTIFIED |
Async | Post-identification actions |
ON_TASK_CREATED |
Async | Task routing |
ON_ERROR |
Async | Error monitoring |
AI Intelligence Layer¶
The AIChannel is a special channel category (INTELLIGENCE) that generates AI responses:
from roomkit import AIChannel, AnthropicAIProvider, AnthropicConfig
provider = AnthropicAIProvider(AnthropicConfig(api_key="sk-..."))
ai = AIChannel(
"ai-assistant",
provider=provider,
system_prompt="You are a helpful customer support agent.",
temperature=0.7,
max_tokens=1024,
max_context_events=50, # Window of conversation history
)
AI features:
- Context-aware -- Builds conversation context from recent room events
- Self-loop prevention -- Skips events from itself to prevent self-echoing
- Chain depth limiting -- Global max_chain_depth (default 5) prevents runaway AI-to-AI loops; exceeded events are stored as BLOCKED with an observation
- Provider-agnostic -- Swap between Anthropic, OpenAI, PydanticAI, or custom providers
- Capability-aware generation -- AI considers target transport channel capabilities when generating responses
- Mute-aware -- Muted AI channels still process events (tasks, observations) but suppress response messages
- Vision support -- Providers with vision capability can receive and process images
Vision Support¶
AI providers can optionally support vision (image processing) by setting supports_vision=True. When enabled:
- The
AIChannel.capabilities()includesMEDIAin supported media types - The transcoder passes images through instead of converting to text
AIMessage.contentbecomes multimodal:str | list[AITextPart | AIImagePart]
from roomkit import AIChannel, AIProvider, AIContext, AIResponse, AITextPart, AIImagePart
class VisionAIProvider(AIProvider):
@property
def supports_vision(self) -> bool:
return True # Enable vision capability
@property
def model_name(self) -> str:
return "gpt-4o"
async def generate(self, context: AIContext) -> AIResponse:
for msg in context.messages:
if isinstance(msg.content, list):
# Multimodal content: text and images
for part in msg.content:
if isinstance(part, AITextPart):
print(f"Text: {part.text}")
elif isinstance(part, AIImagePart):
print(f"Image: {part.url} ({part.mime_type})")
else:
# Plain text content
print(f"Text: {msg.content}")
return AIResponse(content="I can see the image!")
# Use with AIChannel
provider = VisionAIProvider()
ai = AIChannel("ai-vision", provider=provider)
# Channel now supports MEDIA
caps = ai.capabilities()
assert caps.supports_media is True
When an MMS with an image arrives in a room with a vision-enabled AI:
MMS (text + image) → Room
↓
AIChannel.capabilities() includes MEDIA
↓
Transcoder: No conversion needed (AI supports MEDIA)
↓
AIChannel extracts multimodal content:
→ [AITextPart("Check this!"), AIImagePart(url="https://...", mime_type="image/jpeg")]
↓
AIProvider.generate() receives multimodal AIMessage
↓
Provider sends image URL to vision model (Claude, GPT-4V, etc.)
The MockAIProvider supports vision for testing:
from roomkit import MockAIProvider
# Enable vision for testing
provider = MockAIProvider(responses=["I see a cat!"], vision=True)
assert provider.supports_vision is True
Realtime Events (Typing, Presence, Read Receipts)¶
RoomKit provides a pluggable realtime backend for ephemeral events that don't require persistence:
from roomkit import RoomKit, EphemeralEvent, EphemeralEventType
kit = RoomKit() # Uses InMemoryRealtime by default
# Subscribe to ephemeral events for a room
async def on_realtime(event: EphemeralEvent):
if event.type == EphemeralEventType.TYPING_START:
print(f"{event.user_id} is typing...")
elif event.type == EphemeralEventType.READ_RECEIPT:
print(f"{event.user_id} read event {event.data['event_id']}")
sub_id = await kit.subscribe_room("room-123", on_realtime)
# Publish typing indicator
await kit.publish_typing("room-123", "user-456")
# Publish presence update
await kit.publish_presence("room-123", "user-456", "online")
# Publish read receipt
await kit.publish_read_receipt("room-123", "user-456", "event-789")
# Unsubscribe when done
await kit.unsubscribe_room(sub_id)
Event types:
| Type | Description |
|---|---|
TYPING_START |
User started typing |
TYPING_STOP |
User stopped typing |
PRESENCE_ONLINE |
User is online |
PRESENCE_AWAY |
User is away |
PRESENCE_OFFLINE |
User went offline |
READ_RECEIPT |
User read a message |
CUSTOM |
Custom ephemeral event |
WebSocket integration example:
from fastapi import FastAPI, WebSocket
app = FastAPI()
kit = RoomKit()
@app.websocket("/chat/{room_id}/{user_id}")
async def websocket_chat(websocket: WebSocket, room_id: str, user_id: str):
await websocket.accept()
# Forward realtime events to WebSocket
async def forward_realtime(event: EphemeralEvent):
await websocket.send_json(event.to_dict())
sub_id = await kit.subscribe_room(room_id, forward_realtime)
try:
while True:
data = await websocket.receive_json()
if data["type"] == "typing":
await kit.publish_typing(room_id, user_id)
elif data["type"] == "read":
await kit.publish_read_receipt(room_id, user_id, data["event_id"])
finally:
await kit.unsubscribe_room(sub_id)
Custom backend for distributed deployments:
The default InMemoryRealtime is single-process only. For distributed deployments, implement RealtimeBackend:
from roomkit import RealtimeBackend, EphemeralEvent, EphemeralCallback, RoomKit
import redis.asyncio as redis
class RedisRealtime(RealtimeBackend):
def __init__(self, url: str = "redis://localhost:6379"):
self._redis = redis.from_url(url)
# ... implementation
async def publish(self, channel: str, event: EphemeralEvent) -> None:
await self._redis.publish(channel, json.dumps(event.to_dict()))
async def subscribe(self, channel: str, callback: EphemeralCallback) -> str:
# ... implementation
return sub_id
async def unsubscribe(self, subscription_id: str) -> bool:
# ... implementation
async def close(self) -> None:
await self._redis.close()
# Use custom backend
kit = RoomKit(realtime=RedisRealtime("redis://localhost:6379"))
Content Transcoding¶
When a message is delivered to a channel that doesn't support the original content type, RoomKit automatically transcodes:
| Source Content | Target Capability | Transcoded Output |
|---|---|---|
RichContent (HTML) |
Text only | Plain text fallback body |
MediaContent |
Text only | Caption + URL as text |
LocationContent |
Text only | "[Location: label (lat, lon)]" |
AudioContent |
Text only | Transcript text or "[Voice message]" |
VideoContent |
Text only | "[Video: url]" |
CompositeContent |
Text only | Recursively transcoded parts, flattened |
TemplateContent |
Text only | Template body text |
TextContent |
Any | Always passed through |
If a content type cannot be transcoded for the target, the delivery to that channel is skipped with a transcoding_failed warning.
Identity Resolution¶
Pluggable identity pipeline for identifying inbound message senders:
class MyIdentityResolver(IdentityResolver):
async def resolve(self, message: InboundMessage, context: RoomContext) -> IdentityResult:
user = await lookup_user(message.sender_id)
if user:
return IdentityResult(
status=IdentificationStatus.IDENTIFIED,
identity=Identity(id=user.id, display_name=user.name),
)
return IdentityResult(status=IdentificationStatus.UNKNOWN)
kit = RoomKit(identity_resolver=MyIdentityResolver())
Identity statuses and their hooks:
| Status | Hook Trigger | Use Case |
|---|---|---|
IDENTIFIED |
ON_PARTICIPANT_IDENTIFIED |
User found, proceed normally |
PENDING |
ON_IDENTITY_AMBIGUOUS |
Awaiting asynchronous verification |
AMBIGUOUS |
ON_IDENTITY_AMBIGUOUS |
Multiple candidates; prompt for clarification |
UNKNOWN |
ON_IDENTITY_UNKNOWN |
No match found; request identification |
CHALLENGE_SENT |
-- | Identity hook sent verification challenge; message blocked |
REJECTED |
-- | Identity verification failed; message blocked |
Identity hooks can return IdentityHookResult with:
- resolved() -- Provide the resolved identity
- pending() -- Keep the participant in pending state
- challenge() -- Send a verification message and block the original
- reject() -- Block the message with a reason
After-the-fact resolution is also supported via resolve_participant().
Channel Support¶
Feature Matrix¶
| Feature | WebSocket | SMS | Messenger | HTTP | AI | ||
|---|---|---|---|---|---|---|---|
| Text | x | x | x | x | x | x | x |
| Rich text | x | -- | x | x | x | x | x |
| Media | x | -- | x | x | x | -- | *[1] |
| Location | -- | -- | -- | -- | x | -- | -- |
| Templates | -- | -- | -- | x | x | -- | -- |
| Buttons | -- | -- | -- | x | x | -- | -- |
| Quick replies | -- | -- | -- | x | x | -- | -- |
| Threading | -- | -- | x | -- | -- | -- | -- |
| Reactions | x | -- | -- | -- | x | -- | -- |
| Read receipts | x | x | -- | x | x | -- | -- |
| Typing indicators | x | -- | -- | -- | -- | -- | -- |
| Max length | -- | 1600 | -- | 2000 | 4096 | -- | -- |
| Bidirectional | x | x | x | x | x | x | x |
| Category | Transport | Transport | Transport | Transport | Transport | Transport | Intelligence |
[1] AI channels support media when the provider has vision capability (supports_vision=True). See Vision Support.
WebSocket Channel¶
Real-time bidirectional communication via callback-based connection registry:
ws = WebSocketChannel("ws-user")
kit.register_channel(ws)
# Register a connection and send function
await kit.connect_websocket("ws-user", "conn-123", send_fn)
# Later: disconnect
await kit.disconnect_websocket("ws-user", "conn-123")
The WebSocketChannel is the only transport channel with a dedicated class (not using TransportChannel). It supports typing indicators, reactions, and read receipts.
SMS Channel¶
SMS transport with 1600-character limit and provider abstraction:
from roomkit import SMSChannel, TelnyxSMSProvider, TelnyxConfig
provider = TelnyxSMSProvider(TelnyxConfig(
api_key="KEY...",
from_number="+15551234567",
))
sms = SMSChannel("sms-channel", provider=provider)
The recipient phone number is read from binding.metadata["phone_number"] at delivery time.
Available providers:
| Provider | Status | Features |
|---|---|---|
| Sinch | Implemented | SMS/MMS send, webhook parsing, signature verification (HMAC-SHA1) |
| Telnyx | Implemented | SMS/MMS send, webhook parsing, signature verification (ED25519) |
| Twilio | Implemented | SMS/MMS send, webhook parsing, signature verification (HMAC-SHA1) |
| VoiceMeUp | Implemented | SMS/MMS send, webhook parsing, MMS aggregator |
MMS Support¶
RoomKit supports MMS (Multimedia Messaging Service) through SMS providers. When an MMS arrives, the event's channel_type is automatically set to mms instead of sms.
Provider differences:
| Provider | MMS Webhook Behavior |
|---|---|
| Twilio | Single webhook with all media URLs |
| Sinch | Single webhook with media array |
| Telnyx | Single webhook with media array |
| VoiceMeUp | Split webhooks — automatic aggregation in parse_voicemeup_webhook() |
Media URL re-hosting: Provider media URLs should be re-hosted before forwarding to other channels (especially AI channels). Provider URLs may be temporary, blocked by certain services, or inaccessible. See the SMS Providers documentation for details.
VoiceMeUp MMS handling: VoiceMeUp sends MMS as two separate webhooks (text + metadata, then image). parse_voicemeup_webhook() automatically buffers and merges them into a single event. See the VoiceMeUp documentation for details.
SMS Utilities:
RoomKit includes utilities to simplify working with SMS webhooks and phone numbers:
from roomkit import extract_sms_meta, normalize_phone, WebhookMeta
# Extract normalized metadata from any provider's webhook payload
meta: WebhookMeta = extract_sms_meta("twilio", payload)
print(f"From: {meta.sender}, Body: {meta.body}")
# Convert directly to InboundMessage (parse once, use everywhere)
sender = normalize_phone(meta.sender, "CA")
inbound = meta.to_inbound(channel_id="sms-channel")
result = await kit.process_inbound(inbound)
# Normalize phone numbers to E.164 format (requires phonenumbers)
normalized = normalize_phone("418-555-1234", "CA") # → "+14185551234"
Webhook signature verification:
# Telnyx (ED25519, requires pynacl)
from roomkit import TelnyxSMSProvider, TelnyxConfig
telnyx = TelnyxSMSProvider(
TelnyxConfig(api_key="KEY...", from_number="+15551234567"),
public_key="your-telnyx-public-key-base64",
)
is_valid = telnyx.verify_signature(
payload=request.body,
signature=request.headers["Telnyx-Signature-Ed25519"],
timestamp=request.headers["Telnyx-Timestamp"],
)
# Twilio (HMAC-SHA1)
from roomkit import TwilioSMSProvider, TwilioConfig
twilio = TwilioSMSProvider(TwilioConfig(
account_sid="AC...", auth_token="...", from_number="+15551234567"
))
is_valid = twilio.verify_signature(
payload=request.body,
signature=request.headers["X-Twilio-Signature"],
url=str(request.url), # Full URL required for Twilio
)
# Sinch (HMAC-SHA1)
from roomkit import SinchSMSProvider, SinchConfig
sinch = SinchSMSProvider(SinchConfig(
service_plan_id="...", api_token="...", from_number="+15551234567",
webhook_secret="your-secret",
))
is_valid = sinch.verify_signature(
payload=request.body,
signature=request.headers["X-Sinch-Signature"],
)
Email Channel¶
Email transport with threading support:
from roomkit import EmailChannel, ElasticEmailProvider, ElasticEmailConfig
provider = ElasticEmailProvider(ElasticEmailConfig(
api_key="...",
from_email="support@example.com",
from_name="Support Team",
))
email = EmailChannel("email-channel", provider=provider)
The recipient email is read from binding.metadata["email_address"]. Available providers: ElasticEmail (implemented), SendGrid (scaffolded).
Messenger Channel¶
Facebook Messenger integration with rich interactive elements:
from roomkit import MessengerChannel, FacebookMessengerProvider, MessengerConfig
provider = FacebookMessengerProvider(MessengerConfig(
page_access_token="...",
app_secret="...",
))
messenger = MessengerChannel("fb-channel", provider=provider)
Recipient ID read from binding.metadata["facebook_user_id"]. Supports buttons (max 3), quick replies, and templates. Includes parse_messenger_webhook() for inbound webhook parsing.
WhatsApp Channel¶
WhatsApp Business integration:
Recipient phone read from binding.metadata["phone_number"]. Supports text, rich text, media, location, templates, buttons (max 3), and quick replies. Max message length: 4096 characters. Currently mock-only; no production provider.
HTTP Webhook Channel¶
Generic webhook transport for custom integrations:
from roomkit import HTTPChannel, WebhookHTTPProvider, HTTPProviderConfig
provider = WebhookHTTPProvider(HTTPProviderConfig(
webhook_url="https://example.com/webhook",
))
http = HTTPChannel("http-channel", provider=provider)
Recipient ID read from binding.metadata["recipient_id"]. Includes parse_http_webhook() for inbound webhook parsing.
Resilience Features¶
Circuit Breaker¶
Automatic fault isolation for failing provider channels:
- Closed -- Normal operation; requests flow through
- Open -- After 5 consecutive failures, all requests are rejected immediately
- Half-open -- After 60s recovery timeout, allows one probe request
- Successful probe resets to Closed; failure keeps Open
- Per-channel instances managed by the EventRouter
Rate Limiting¶
Per-channel token bucket rate limiter:
await kit.attach_channel("room-1", "sms-channel",
rate_limit=RateLimit(max_per_second=1.0),
metadata={"phone_number": "+15551234567"},
)
Configurable via max_per_second, max_per_minute, or max_per_hour. Uses wait-based backpressure (queues requests until a token is available).
Retry with Backoff¶
Configurable exponential backoff for transient delivery failures:
await kit.attach_channel("room-1", "email-channel",
retry_policy=RetryPolicy(
max_retries=3,
base_delay_seconds=1.0,
max_delay_seconds=60.0,
exponential_base=2.0, # delay = base * (2 ^ attempt)
),
metadata={"email_address": "user@example.com"},
)
After exhausting all retries, the last exception is raised and recorded in the circuit breaker.
Chain Depth Limiting¶
Prevents infinite loops when AI channels generate responses that trigger other AI channels:
Events exceeding the chain depth limit are stored with status=BLOCKED and blocked_by="event_chain_depth_limit". An Observation is created documenting the exceeded depth. A framework event chain_depth_exceeded is emitted.
Idempotency¶
Duplicate message detection via idempotency keys:
result = await kit.process_inbound(InboundMessage(
channel_id="sms-channel",
sender_id="+15551234567",
content=TextContent(body="Hello"),
idempotency_key="provider-msg-id-12345",
))
The idempotency check is performed inside the room lock to prevent TOCTOU races.
Participant Roles and Permissions¶
Roles¶
| Role | Description |
|---|---|
OWNER |
Room creator with full control |
AGENT |
Support agent or operator |
MEMBER |
Regular participant |
OBSERVER |
Read-only observer (analytics, compliance) |
BOT |
Automated participant (AI, webhook) |
Participant Statuses¶
| Status | Description |
|---|---|
ACTIVE |
Currently participating |
INACTIVE |
Temporarily away |
LEFT |
Has left the room |
BANNED |
Removed from the room |
Channel Access Control¶
Access levels control what each channel can do within a room:
| Access | Can Send | Receives Broadcasts |
|---|---|---|
READ_WRITE |
x | x |
READ_ONLY |
-- | x |
WRITE_ONLY |
x | -- |
NONE |
-- | -- |
Access can be changed dynamically:
await kit.set_access("room-1", "observer-channel", Access.READ_ONLY)
await kit.set_visibility("room-1", "ai-channel", "intelligence")
Muting¶
Muting suppresses a channel's response events without detaching it:
await kit.mute("room-1", "ai-bot") # AI stops responding but keeps analyzing
await kit.unmute("room-1", "ai-bot") # AI resumes responding
Muted channels still receive events via on_event() and can produce tasks and observations. Only their response_events are suppressed. This enables "silent observer" patterns where AI monitors without participating.
Observability¶
Framework Events¶
RoomKit emits FrameworkEvent objects for observability via the @kit.on() decorator:
@kit.on("room_created")
async def handle_room_created(event: FrameworkEvent) -> None:
print(f"Room created: {event.data}")
Framework event types:
- room_created, room_closed, room_paused
- room_channel_attached, room_channel_detached
- event_blocked, event_processed
- delivery_succeeded, delivery_failed
- broadcast_partial_failure
- chain_depth_exceeded
- identity_timeout, process_timeout
- hook_error
- channel_connected, channel_disconnected (WebSocket)
Hook-Based Monitoring¶
Async hooks can be used for real-time monitoring and analytics:
@kit.hook(HookTrigger.AFTER_BROADCAST, execution=HookExecution.ASYNC)
async def monitor(event: RoomEvent, ctx: RoomContext) -> None:
await metrics.increment("messages_processed")
await metrics.gauge("room_event_count", ctx.room.event_count)
Side Effects: Tasks and Observations¶
Hooks and intelligence channels can produce structured side effects:
- Tasks -- Work items with status tracking (
PENDING,IN_PROGRESS,COMPLETED,FAILED,CANCELLED). Include title, description, assigned_to, and metadata. - Observations -- Intelligence findings with
category(e.g., "sentiment", "compliance_flag"),confidencescore (0-1), and metadata.
Both are persisted via the ConversationStore and queryable per room:
tasks = await kit.list_tasks("room-1", status="pending")
observations = await kit.list_observations("room-1")
User Workflows¶
Customer Support Flow¶
sequenceDiagram
participant Customer as Customer (SMS)
participant Room as RoomKit Room
participant AI as AI Assistant
participant Agent as Agent (WebSocket)
Customer->>Room: "I need help with my account"
Room->>AI: Event broadcast
AI->>Room: AI response: "I can help! What's your account number?"
Room->>Customer: SMS delivery (transcoded to text)
Room->>Agent: WebSocket delivery (agent sees conversation)
Customer->>Room: "Account #12345"
Room->>AI: Event broadcast
AI->>Room: AI response with account lookup
Room->>Customer: SMS delivery
Room->>Agent: WebSocket delivery
Note over Agent: Agent takes over
Agent->>Room: "Let me handle this personally"
Room->>Customer: SMS delivery
Room->>AI: Event broadcast (AI observes)
Multi-Channel Notification Flow¶
sequenceDiagram
participant System as Backend System
participant Room as RoomKit Room
participant WS as WebSocket (App)
participant Email as Email
participant SMS as SMS
System->>Room: System event via send_event()
Room->>WS: Rich notification (real-time)
Room->>Email: Full notification (HTML, threaded)
Room->>SMS: Text summary (max 1600 chars)
Identity Verification Flow¶
sequenceDiagram
participant User as Unknown Sender (SMS)
participant Room as RoomKit Room
participant IR as Identity Resolver
participant Hook as Identity Hook
User->>Room: Inbound message
Room->>IR: resolve(message, context)
IR-->>Room: UNKNOWN
Room->>Hook: ON_IDENTITY_UNKNOWN
Hook-->>Room: IdentityHookResult.challenge()
Note over Hook: Injects verification message
Room->>User: "Please verify: reply with your DOB"
Note over Room: Original message blocked
User->>Room: "1990-01-15"
Room->>IR: resolve(message, context)
IR-->>Room: IDENTIFIED (identity resolved)
Note over Room: Message proceeds through pipeline
Dynamic Channel Management¶
sequenceDiagram
participant Admin as Admin
participant Room as RoomKit Room
participant AI as AI Channel
participant SMS as SMS Channel
Admin->>Room: attach_channel("ai-bot")
Note over Room: AI starts responding to messages
Admin->>Room: mute("ai-bot")
Note over AI: AI still receives events, produces tasks/observations
Note over AI: But response messages are suppressed
Admin->>Room: set_access("sms-out", READ_ONLY)
Note over SMS: SMS can only observe, cannot send
Admin->>Room: unmute("ai-bot")
Note over AI: AI resumes responding
Admin->>Room: detach_channel("sms-out")
Note over SMS: SMS removed from room
Integration Points¶
Inbound Message Ingestion¶
External systems deliver messages to RoomKit via process_inbound():
# From a FastAPI webhook handler
@app.post("/webhook/sms")
async def sms_webhook(request: Request):
payload = await request.json()
message = parse_voicemeup_webhook(payload)
result = await kit.process_inbound(message)
return {"status": "ok", "blocked": result.blocked}
Direct Event Injection¶
Send events directly into a room without going through the inbound pipeline:
event = await kit.send_event(
room_id="room-1",
channel_id="system-channel",
content=TextContent(body="System maintenance in 5 minutes"),
)
Webhook Parsers¶
Built-in webhook parsers for provider-specific payloads:
parse_sinch_webhook()-- Sinch SMS webhooksparse_telnyx_webhook()-- Telnyx SMS webhooksparse_twilio_webhook()-- Twilio SMS webhooks (form-encoded)parse_voicemeup_webhook()-- VoiceMeUp SMS webhooksparse_messenger_webhook()-- Facebook Messenger webhooksparse_http_webhook()-- Generic HTTP webhook payloadsextract_sms_meta()-- Normalized metadata extraction for any SMS provider
Custom Storage Backends¶
Implement ConversationStore for any persistence layer:
class PostgresStore(ConversationStore):
async def create_room(self, room: Room) -> Room:
async with self.pool.acquire() as conn:
await conn.execute("INSERT INTO rooms ...", room.model_dump())
return room
# ... implement remaining 29 abstract methods
Custom AI Providers¶
Implement AIProvider for any AI service:
class CustomAIProvider(AIProvider):
async def generate(self, context: AIContext) -> AIResponse:
response = await my_ai_client.chat(context.messages)
return AIResponse(content=response.text, usage={"tokens": response.usage})
Custom Identity Resolvers¶
Implement IdentityResolver for any user directory:
class CRMIdentityResolver(IdentityResolver):
async def resolve(self, message: InboundMessage, context: RoomContext) -> IdentityResult:
user = await crm.lookup_by_phone(message.sender_id)
if user:
return IdentityResult(
status=IdentificationStatus.IDENTIFIED,
identity=Identity(id=user.id, display_name=user.name),
)
return IdentityResult(status=IdentificationStatus.UNKNOWN)
Custom Inbound Routers¶
Override the default room routing strategy:
class TenantRouter(InboundRoomRouter):
async def route(self, channel_id, channel_type, participant_id) -> str | None:
# Route to room based on tenant-specific logic
return await self.lookup_tenant_room(participant_id)
kit = RoomKit(inbound_router=TenantRouter())
Current Limitations¶
- Single-process only (default) --
InMemoryLockManagerandInMemoryRealtimeuse asyncio primitives; distributed deployments require customRoomLockManagerandRealtimeBackendimplementations - In-memory storage only (default) -- No persistent store ships with the library; production use requires a custom
ConversationStore - No built-in HTTP server -- RoomKit is a library, not a server; webhook endpoints must be provided by the host application
- No file/media storage --
MediaContentstores URLs; actual file hosting must be handled externally - No end-to-end encryption -- Content is available in plaintext within the pipeline; encryption must be handled at the transport level
- Limited WhatsApp support -- Mock provider only; no production WhatsApp Business API integration
- No voice channel --
ChannelType.VOICEis defined but not implemented - No push notification channel --
ChannelType.PUSHis defined but not implemented - No pause_room() method -- Rooms can only be paused via timers (
check_room_timers), not directly
Potential Enhancements¶
- Persistent storage backends -- PostgreSQL, Redis, or MongoDB
ConversationStoreimplementations - Distributed locking -- Redis-based
RoomLockManagerfor multi-process deployments - Distributed realtime -- Redis pub/sub or NATS-based
RealtimeBackendfor multi-process deployments - Event streaming -- Kafka or Redis Streams integration for cross-service event distribution
- OpenTelemetry integration -- Built-in tracing and metrics via the
FrameworkEventsystem - Voice channel -- WebRTC or telephony integration via Twilio Voice or similar
- Push notifications -- Firebase Cloud Messaging or APNs provider
- WhatsApp Business API -- Full provider implementation with template management
- Message search -- Full-text search across room events
- File storage abstraction -- S3/GCS integration for media content hosting
- Admin dashboard -- Web UI for room management and monitoring
- Rate limit queuing -- Queue-and-drain instead of drop when rate limited
- Direct pause/resume -- Explicit
pause_room()andresume_room()methods