Skip to content

Features

Core Features

Multi-Channel Conversation Rooms

RoomKit provides a room-based abstraction for managing conversations across multiple communication channels simultaneously. A room is a shared conversation context where messages flow between all attached channels.

kit = RoomKit()

# Register channels
kit.register_channel(WebSocketChannel("ws-user"))
kit.register_channel(SMSChannel("sms-user", provider=sms_provider))
kit.register_channel(AIChannel("ai-bot", provider=ai_provider))

# Create a room and attach channels
room = await kit.create_room(room_id="support-123")
await kit.attach_channel("support-123", "ws-user")
await kit.attach_channel("support-123", "sms-user",
    metadata={"phone_number": "+15551234567"})
await kit.attach_channel("support-123", "ai-bot", category=ChannelCategory.INTELLIGENCE)

When a message arrives on any channel, it is automatically broadcast to all other attached channels, with content transcoded as needed for each target's capabilities.

Room Lifecycle Management

Rooms follow a state machine with four statuses:

stateDiagram-v2
    [*] --> ACTIVE: create_room()
    ACTIVE --> PAUSED: pause (timer or manual)
    PAUSED --> ACTIVE: (resume on activity)
    ACTIVE --> CLOSED: close_room() or timer
    PAUSED --> CLOSED: close_room() or timer
    CLOSED --> ARCHIVED: (archive)
  • ACTIVE -- Messages flow normally between all attached channels
  • PAUSED -- Room is temporarily suspended; closed timer continues
  • CLOSED -- Conversation is ended; no new messages accepted
  • ARCHIVED -- Final state for long-term storage

Timer-based automation:

from roomkit import RoomTimers

await kit.create_room(
    room_id="support-123",
    metadata={"timers": RoomTimers(
        inactive_after_seconds=300,     # Auto-pause after 5min inactivity
        closed_after_seconds=3600,      # Auto-close after 1hr inactivity
    )},
)

# Check and apply timer transitions for all active/paused rooms
transitioned = await kit.check_all_timers()

Event Pipeline

Every message passes through a deterministic processing pipeline:

  1. Inbound routing -- Resolve which room the message belongs to (by channel binding or participant)
  2. Auto-create -- If no room found, create a new room and attach the channel
  3. Channel conversion -- handle_inbound() converts the raw message to a RoomEvent
  4. Identity resolution -- Identify the sender (optional, with timeout)
  5. Room lock -- Acquire per-room lock for atomic processing
  6. Idempotency check -- Reject duplicate messages by idempotency_key
  7. Sync hooks -- Content filtering, modification, or blocking (BEFORE_BROADCAST)
  8. Event storage -- Persist the event to the conversation store
  9. Broadcast -- Deliver to all eligible channels via the EventRouter
  10. Reentry drain -- Process AI response events in a loop (bounded by max_chain_depth)
  11. Side effects -- Persist tasks and observations
  12. Async hooks -- Side effects, logging, analytics (AFTER_BROADCAST)
  13. Activity update -- Update room timestamp and latest event index

Hook System

Hooks intercept events at specific points in the pipeline for business logic injection.

Sync hooks run before broadcast and can block, allow, or modify events:

@kit.hook(HookTrigger.BEFORE_BROADCAST, name="profanity_filter")
async def profanity_filter(event: RoomEvent, ctx: RoomContext) -> HookResult:
    text = Channel.extract_text(event)
    if contains_profanity(text):
        return HookResult.block("Message contains inappropriate language")
    return HookResult.allow()

Async hooks run after broadcast for side effects:

@kit.hook(HookTrigger.AFTER_BROADCAST, execution=HookExecution.ASYNC, name="logger")
async def log_event(event: RoomEvent, ctx: RoomContext) -> None:
    await analytics.track("message_sent", {"room": event.room_id})

Hook features: - Priority ordering -- Hooks execute in priority order (lower numbers first) - Per-room hooks -- Attach hooks to specific rooms dynamically via add_room_hook() - Global hooks -- Apply to all rooms via the @kit.hook() decorator - Timeout protection -- Configurable timeout per hook (default 30s) - Error isolation -- Hook failures are logged and collected as hook_errors but don't crash the pipeline - Event injection -- Hooks can inject synthetic events via HookResult.injected_events - Task/observation creation -- Hooks can create side-effect tasks and observations - Event filtering -- Hooks can be filtered by channel type, channel ID, and direction

Hook filtering allows hooks to run only for specific event types:

from roomkit import ChannelType, ChannelDirection

# Only run for inbound SMS/MMS events
@kit.hook(
    HookTrigger.BEFORE_BROADCAST,
    name="rehost_media",
    channel_types={ChannelType.SMS, ChannelType.MMS},
    directions={ChannelDirection.INBOUND},
)
async def rehost_media(event: RoomEvent, ctx: RoomContext) -> HookResult:
    # Only called for inbound SMS/MMS — no need to check inside
    ...
    return HookResult.allow()

# Only run for a specific channel
@kit.hook(
    HookTrigger.BEFORE_BROADCAST,
    name="voicemeup_specific",
    channel_ids={"sms-voicemeup"},
)
async def voicemeup_hook(event: RoomEvent, ctx: RoomContext) -> HookResult:
    ...

Filter options: - channel_types: set[ChannelType] -- Only run for these channel types (e.g., {ChannelType.SMS}) - channel_ids: set[str] -- Only run for these channel IDs (e.g., {"sms-voicemeup"}) - directions: set[ChannelDirection] -- Only run for these directions (e.g., {ChannelDirection.INBOUND})

Hook triggers:

Trigger Execution Use Case
BEFORE_BROADCAST Sync Content filtering, modification, blocking
AFTER_BROADCAST Async Logging, analytics, notifications
ON_ROOM_CREATED Async Room initialization
ON_ROOM_PAUSED Async Inactivity alerts
ON_ROOM_CLOSED Async Cleanup, archival
ON_CHANNEL_ATTACHED Async Welcome messages
ON_CHANNEL_DETACHED Async Farewell messages
ON_CHANNEL_MUTED Async State tracking
ON_CHANNEL_UNMUTED Async State tracking
ON_IDENTITY_AMBIGUOUS Both Multi-candidate disambiguation
ON_IDENTITY_UNKNOWN Both Unknown sender handling
ON_PARTICIPANT_IDENTIFIED Async Post-identification actions
ON_TASK_CREATED Async Task routing
ON_ERROR Async Error monitoring

AI Intelligence Layer

The AIChannel is a special channel category (INTELLIGENCE) that generates AI responses:

from roomkit import AIChannel, AnthropicAIProvider, AnthropicConfig

provider = AnthropicAIProvider(AnthropicConfig(api_key="sk-..."))
ai = AIChannel(
    "ai-assistant",
    provider=provider,
    system_prompt="You are a helpful customer support agent.",
    temperature=0.7,
    max_tokens=1024,
    max_context_events=50,  # Window of conversation history
)

AI features: - Context-aware -- Builds conversation context from recent room events - Self-loop prevention -- Skips events from itself to prevent self-echoing - Chain depth limiting -- Global max_chain_depth (default 5) prevents runaway AI-to-AI loops; exceeded events are stored as BLOCKED with an observation - Provider-agnostic -- Swap between Anthropic, OpenAI, PydanticAI, or custom providers - Capability-aware generation -- AI considers target transport channel capabilities when generating responses - Mute-aware -- Muted AI channels still process events (tasks, observations) but suppress response messages - Vision support -- Providers with vision capability can receive and process images

Vision Support

AI providers can optionally support vision (image processing) by setting supports_vision=True. When enabled:

  • The AIChannel.capabilities() includes MEDIA in supported media types
  • The transcoder passes images through instead of converting to text
  • AIMessage.content becomes multimodal: str | list[AITextPart | AIImagePart]
from roomkit import AIChannel, AIProvider, AIContext, AIResponse, AITextPart, AIImagePart

class VisionAIProvider(AIProvider):
    @property
    def supports_vision(self) -> bool:
        return True  # Enable vision capability

    @property
    def model_name(self) -> str:
        return "gpt-4o"

    async def generate(self, context: AIContext) -> AIResponse:
        for msg in context.messages:
            if isinstance(msg.content, list):
                # Multimodal content: text and images
                for part in msg.content:
                    if isinstance(part, AITextPart):
                        print(f"Text: {part.text}")
                    elif isinstance(part, AIImagePart):
                        print(f"Image: {part.url} ({part.mime_type})")
            else:
                # Plain text content
                print(f"Text: {msg.content}")
        return AIResponse(content="I can see the image!")

# Use with AIChannel
provider = VisionAIProvider()
ai = AIChannel("ai-vision", provider=provider)

# Channel now supports MEDIA
caps = ai.capabilities()
assert caps.supports_media is True

When an MMS with an image arrives in a room with a vision-enabled AI:

MMS (text + image) → Room
AIChannel.capabilities() includes MEDIA
Transcoder: No conversion needed (AI supports MEDIA)
AIChannel extracts multimodal content:
  → [AITextPart("Check this!"), AIImagePart(url="https://...", mime_type="image/jpeg")]
AIProvider.generate() receives multimodal AIMessage
Provider sends image URL to vision model (Claude, GPT-4V, etc.)

The MockAIProvider supports vision for testing:

from roomkit import MockAIProvider

# Enable vision for testing
provider = MockAIProvider(responses=["I see a cat!"], vision=True)
assert provider.supports_vision is True

Realtime Events (Typing, Presence, Read Receipts)

RoomKit provides a pluggable realtime backend for ephemeral events that don't require persistence:

from roomkit import RoomKit, EphemeralEvent, EphemeralEventType

kit = RoomKit()  # Uses InMemoryRealtime by default

# Subscribe to ephemeral events for a room
async def on_realtime(event: EphemeralEvent):
    if event.type == EphemeralEventType.TYPING_START:
        print(f"{event.user_id} is typing...")
    elif event.type == EphemeralEventType.READ_RECEIPT:
        print(f"{event.user_id} read event {event.data['event_id']}")

sub_id = await kit.subscribe_room("room-123", on_realtime)

# Publish typing indicator
await kit.publish_typing("room-123", "user-456")

# Publish presence update
await kit.publish_presence("room-123", "user-456", "online")

# Publish read receipt
await kit.publish_read_receipt("room-123", "user-456", "event-789")

# Unsubscribe when done
await kit.unsubscribe_room(sub_id)

Event types:

Type Description
TYPING_START User started typing
TYPING_STOP User stopped typing
PRESENCE_ONLINE User is online
PRESENCE_AWAY User is away
PRESENCE_OFFLINE User went offline
READ_RECEIPT User read a message
CUSTOM Custom ephemeral event

WebSocket integration example:

from fastapi import FastAPI, WebSocket

app = FastAPI()
kit = RoomKit()

@app.websocket("/chat/{room_id}/{user_id}")
async def websocket_chat(websocket: WebSocket, room_id: str, user_id: str):
    await websocket.accept()

    # Forward realtime events to WebSocket
    async def forward_realtime(event: EphemeralEvent):
        await websocket.send_json(event.to_dict())

    sub_id = await kit.subscribe_room(room_id, forward_realtime)

    try:
        while True:
            data = await websocket.receive_json()
            if data["type"] == "typing":
                await kit.publish_typing(room_id, user_id)
            elif data["type"] == "read":
                await kit.publish_read_receipt(room_id, user_id, data["event_id"])
    finally:
        await kit.unsubscribe_room(sub_id)

Custom backend for distributed deployments:

The default InMemoryRealtime is single-process only. For distributed deployments, implement RealtimeBackend:

from roomkit import RealtimeBackend, EphemeralEvent, EphemeralCallback, RoomKit
import redis.asyncio as redis

class RedisRealtime(RealtimeBackend):
    def __init__(self, url: str = "redis://localhost:6379"):
        self._redis = redis.from_url(url)
        # ... implementation

    async def publish(self, channel: str, event: EphemeralEvent) -> None:
        await self._redis.publish(channel, json.dumps(event.to_dict()))

    async def subscribe(self, channel: str, callback: EphemeralCallback) -> str:
        # ... implementation
        return sub_id

    async def unsubscribe(self, subscription_id: str) -> bool:
        # ... implementation

    async def close(self) -> None:
        await self._redis.close()

# Use custom backend
kit = RoomKit(realtime=RedisRealtime("redis://localhost:6379"))

Content Transcoding

When a message is delivered to a channel that doesn't support the original content type, RoomKit automatically transcodes:

Source Content Target Capability Transcoded Output
RichContent (HTML) Text only Plain text fallback body
MediaContent Text only Caption + URL as text
LocationContent Text only "[Location: label (lat, lon)]"
AudioContent Text only Transcript text or "[Voice message]"
VideoContent Text only "[Video: url]"
CompositeContent Text only Recursively transcoded parts, flattened
TemplateContent Text only Template body text
TextContent Any Always passed through

If a content type cannot be transcoded for the target, the delivery to that channel is skipped with a transcoding_failed warning.

Identity Resolution

Pluggable identity pipeline for identifying inbound message senders:

class MyIdentityResolver(IdentityResolver):
    async def resolve(self, message: InboundMessage, context: RoomContext) -> IdentityResult:
        user = await lookup_user(message.sender_id)
        if user:
            return IdentityResult(
                status=IdentificationStatus.IDENTIFIED,
                identity=Identity(id=user.id, display_name=user.name),
            )
        return IdentityResult(status=IdentificationStatus.UNKNOWN)

kit = RoomKit(identity_resolver=MyIdentityResolver())

Identity statuses and their hooks:

Status Hook Trigger Use Case
IDENTIFIED ON_PARTICIPANT_IDENTIFIED User found, proceed normally
PENDING ON_IDENTITY_AMBIGUOUS Awaiting asynchronous verification
AMBIGUOUS ON_IDENTITY_AMBIGUOUS Multiple candidates; prompt for clarification
UNKNOWN ON_IDENTITY_UNKNOWN No match found; request identification
CHALLENGE_SENT -- Identity hook sent verification challenge; message blocked
REJECTED -- Identity verification failed; message blocked

Identity hooks can return IdentityHookResult with: - resolved() -- Provide the resolved identity - pending() -- Keep the participant in pending state - challenge() -- Send a verification message and block the original - reject() -- Block the message with a reason

After-the-fact resolution is also supported via resolve_participant().


Channel Support

Feature Matrix

Feature WebSocket SMS Email Messenger WhatsApp HTTP AI
Text x x x x x x x
Rich text x -- x x x x x
Media x -- x x x -- *[1]
Location -- -- -- -- x -- --
Templates -- -- -- x x -- --
Buttons -- -- -- x x -- --
Quick replies -- -- -- x x -- --
Threading -- -- x -- -- -- --
Reactions x -- -- -- x -- --
Read receipts x x -- x x -- --
Typing indicators x -- -- -- -- -- --
Max length -- 1600 -- 2000 4096 -- --
Bidirectional x x x x x x x
Category Transport Transport Transport Transport Transport Transport Intelligence

[1] AI channels support media when the provider has vision capability (supports_vision=True). See Vision Support.

WebSocket Channel

Real-time bidirectional communication via callback-based connection registry:

ws = WebSocketChannel("ws-user")
kit.register_channel(ws)

# Register a connection and send function
await kit.connect_websocket("ws-user", "conn-123", send_fn)

# Later: disconnect
await kit.disconnect_websocket("ws-user", "conn-123")

The WebSocketChannel is the only transport channel with a dedicated class (not using TransportChannel). It supports typing indicators, reactions, and read receipts.

SMS Channel

SMS transport with 1600-character limit and provider abstraction:

from roomkit import SMSChannel, TelnyxSMSProvider, TelnyxConfig

provider = TelnyxSMSProvider(TelnyxConfig(
    api_key="KEY...",
    from_number="+15551234567",
))
sms = SMSChannel("sms-channel", provider=provider)

The recipient phone number is read from binding.metadata["phone_number"] at delivery time.

Available providers:

Provider Status Features
Sinch Implemented SMS/MMS send, webhook parsing, signature verification (HMAC-SHA1)
Telnyx Implemented SMS/MMS send, webhook parsing, signature verification (ED25519)
Twilio Implemented SMS/MMS send, webhook parsing, signature verification (HMAC-SHA1)
VoiceMeUp Implemented SMS/MMS send, webhook parsing, MMS aggregator

MMS Support

RoomKit supports MMS (Multimedia Messaging Service) through SMS providers. When an MMS arrives, the event's channel_type is automatically set to mms instead of sms.

Provider differences:

Provider MMS Webhook Behavior
Twilio Single webhook with all media URLs
Sinch Single webhook with media array
Telnyx Single webhook with media array
VoiceMeUp Split webhooks — automatic aggregation in parse_voicemeup_webhook()

Media URL re-hosting: Provider media URLs should be re-hosted before forwarding to other channels (especially AI channels). Provider URLs may be temporary, blocked by certain services, or inaccessible. See the SMS Providers documentation for details.

VoiceMeUp MMS handling: VoiceMeUp sends MMS as two separate webhooks (text + metadata, then image). parse_voicemeup_webhook() automatically buffers and merges them into a single event. See the VoiceMeUp documentation for details.

SMS Utilities:

RoomKit includes utilities to simplify working with SMS webhooks and phone numbers:

from roomkit import extract_sms_meta, normalize_phone, WebhookMeta

# Extract normalized metadata from any provider's webhook payload
meta: WebhookMeta = extract_sms_meta("twilio", payload)
print(f"From: {meta.sender}, Body: {meta.body}")

# Convert directly to InboundMessage (parse once, use everywhere)
sender = normalize_phone(meta.sender, "CA")
inbound = meta.to_inbound(channel_id="sms-channel")
result = await kit.process_inbound(inbound)

# Normalize phone numbers to E.164 format (requires phonenumbers)
normalized = normalize_phone("418-555-1234", "CA")  # → "+14185551234"

Webhook signature verification:

# Telnyx (ED25519, requires pynacl)
from roomkit import TelnyxSMSProvider, TelnyxConfig

telnyx = TelnyxSMSProvider(
    TelnyxConfig(api_key="KEY...", from_number="+15551234567"),
    public_key="your-telnyx-public-key-base64",
)
is_valid = telnyx.verify_signature(
    payload=request.body,
    signature=request.headers["Telnyx-Signature-Ed25519"],
    timestamp=request.headers["Telnyx-Timestamp"],
)

# Twilio (HMAC-SHA1)
from roomkit import TwilioSMSProvider, TwilioConfig

twilio = TwilioSMSProvider(TwilioConfig(
    account_sid="AC...", auth_token="...", from_number="+15551234567"
))
is_valid = twilio.verify_signature(
    payload=request.body,
    signature=request.headers["X-Twilio-Signature"],
    url=str(request.url),  # Full URL required for Twilio
)

# Sinch (HMAC-SHA1)
from roomkit import SinchSMSProvider, SinchConfig

sinch = SinchSMSProvider(SinchConfig(
    service_plan_id="...", api_token="...", from_number="+15551234567",
    webhook_secret="your-secret",
))
is_valid = sinch.verify_signature(
    payload=request.body,
    signature=request.headers["X-Sinch-Signature"],
)

Email Channel

Email transport with threading support:

from roomkit import EmailChannel, ElasticEmailProvider, ElasticEmailConfig

provider = ElasticEmailProvider(ElasticEmailConfig(
    api_key="...",
    from_email="support@example.com",
    from_name="Support Team",
))
email = EmailChannel("email-channel", provider=provider)

The recipient email is read from binding.metadata["email_address"]. Available providers: ElasticEmail (implemented), SendGrid (scaffolded).

Messenger Channel

Facebook Messenger integration with rich interactive elements:

from roomkit import MessengerChannel, FacebookMessengerProvider, MessengerConfig

provider = FacebookMessengerProvider(MessengerConfig(
    page_access_token="...",
    app_secret="...",
))
messenger = MessengerChannel("fb-channel", provider=provider)

Recipient ID read from binding.metadata["facebook_user_id"]. Supports buttons (max 3), quick replies, and templates. Includes parse_messenger_webhook() for inbound webhook parsing.

WhatsApp Channel

WhatsApp Business integration:

from roomkit import WhatsAppChannel

wa = WhatsAppChannel("wa-channel", provider=whatsapp_provider)

Recipient phone read from binding.metadata["phone_number"]. Supports text, rich text, media, location, templates, buttons (max 3), and quick replies. Max message length: 4096 characters. Currently mock-only; no production provider.

HTTP Webhook Channel

Generic webhook transport for custom integrations:

from roomkit import HTTPChannel, WebhookHTTPProvider, HTTPProviderConfig

provider = WebhookHTTPProvider(HTTPProviderConfig(
    webhook_url="https://example.com/webhook",
))
http = HTTPChannel("http-channel", provider=provider)

Recipient ID read from binding.metadata["recipient_id"]. Includes parse_http_webhook() for inbound webhook parsing.


Resilience Features

Circuit Breaker

Automatic fault isolation for failing provider channels:

  • Closed -- Normal operation; requests flow through
  • Open -- After 5 consecutive failures, all requests are rejected immediately
  • Half-open -- After 60s recovery timeout, allows one probe request
  • Successful probe resets to Closed; failure keeps Open
  • Per-channel instances managed by the EventRouter

Rate Limiting

Per-channel token bucket rate limiter:

await kit.attach_channel("room-1", "sms-channel",
    rate_limit=RateLimit(max_per_second=1.0),
    metadata={"phone_number": "+15551234567"},
)

Configurable via max_per_second, max_per_minute, or max_per_hour. Uses wait-based backpressure (queues requests until a token is available).

Retry with Backoff

Configurable exponential backoff for transient delivery failures:

await kit.attach_channel("room-1", "email-channel",
    retry_policy=RetryPolicy(
        max_retries=3,
        base_delay_seconds=1.0,
        max_delay_seconds=60.0,
        exponential_base=2.0,    # delay = base * (2 ^ attempt)
    ),
    metadata={"email_address": "user@example.com"},
)

After exhausting all retries, the last exception is raised and recorded in the circuit breaker.

Chain Depth Limiting

Prevents infinite loops when AI channels generate responses that trigger other AI channels:

kit = RoomKit(max_chain_depth=5)  # Default: 5

Events exceeding the chain depth limit are stored with status=BLOCKED and blocked_by="event_chain_depth_limit". An Observation is created documenting the exceeded depth. A framework event chain_depth_exceeded is emitted.

Idempotency

Duplicate message detection via idempotency keys:

result = await kit.process_inbound(InboundMessage(
    channel_id="sms-channel",
    sender_id="+15551234567",
    content=TextContent(body="Hello"),
    idempotency_key="provider-msg-id-12345",
))

The idempotency check is performed inside the room lock to prevent TOCTOU races.


Participant Roles and Permissions

Roles

Role Description
OWNER Room creator with full control
AGENT Support agent or operator
MEMBER Regular participant
OBSERVER Read-only observer (analytics, compliance)
BOT Automated participant (AI, webhook)

Participant Statuses

Status Description
ACTIVE Currently participating
INACTIVE Temporarily away
LEFT Has left the room
BANNED Removed from the room

Channel Access Control

Access levels control what each channel can do within a room:

Access Can Send Receives Broadcasts
READ_WRITE x x
READ_ONLY -- x
WRITE_ONLY x --
NONE -- --

Access can be changed dynamically:

await kit.set_access("room-1", "observer-channel", Access.READ_ONLY)
await kit.set_visibility("room-1", "ai-channel", "intelligence")

Muting

Muting suppresses a channel's response events without detaching it:

await kit.mute("room-1", "ai-bot")    # AI stops responding but keeps analyzing
await kit.unmute("room-1", "ai-bot")   # AI resumes responding

Muted channels still receive events via on_event() and can produce tasks and observations. Only their response_events are suppressed. This enables "silent observer" patterns where AI monitors without participating.


Observability

Framework Events

RoomKit emits FrameworkEvent objects for observability via the @kit.on() decorator:

@kit.on("room_created")
async def handle_room_created(event: FrameworkEvent) -> None:
    print(f"Room created: {event.data}")

Framework event types: - room_created, room_closed, room_paused - room_channel_attached, room_channel_detached - event_blocked, event_processed - delivery_succeeded, delivery_failed - broadcast_partial_failure - chain_depth_exceeded - identity_timeout, process_timeout - hook_error - channel_connected, channel_disconnected (WebSocket)

Hook-Based Monitoring

Async hooks can be used for real-time monitoring and analytics:

@kit.hook(HookTrigger.AFTER_BROADCAST, execution=HookExecution.ASYNC)
async def monitor(event: RoomEvent, ctx: RoomContext) -> None:
    await metrics.increment("messages_processed")
    await metrics.gauge("room_event_count", ctx.room.event_count)

Side Effects: Tasks and Observations

Hooks and intelligence channels can produce structured side effects:

  • Tasks -- Work items with status tracking (PENDING, IN_PROGRESS, COMPLETED, FAILED, CANCELLED). Include title, description, assigned_to, and metadata.
  • Observations -- Intelligence findings with category (e.g., "sentiment", "compliance_flag"), confidence score (0-1), and metadata.

Both are persisted via the ConversationStore and queryable per room:

tasks = await kit.list_tasks("room-1", status="pending")
observations = await kit.list_observations("room-1")

User Workflows

Customer Support Flow

sequenceDiagram
    participant Customer as Customer (SMS)
    participant Room as RoomKit Room
    participant AI as AI Assistant
    participant Agent as Agent (WebSocket)

    Customer->>Room: "I need help with my account"
    Room->>AI: Event broadcast
    AI->>Room: AI response: "I can help! What's your account number?"
    Room->>Customer: SMS delivery (transcoded to text)
    Room->>Agent: WebSocket delivery (agent sees conversation)

    Customer->>Room: "Account #12345"
    Room->>AI: Event broadcast
    AI->>Room: AI response with account lookup
    Room->>Customer: SMS delivery
    Room->>Agent: WebSocket delivery

    Note over Agent: Agent takes over
    Agent->>Room: "Let me handle this personally"
    Room->>Customer: SMS delivery
    Room->>AI: Event broadcast (AI observes)

Multi-Channel Notification Flow

sequenceDiagram
    participant System as Backend System
    participant Room as RoomKit Room
    participant WS as WebSocket (App)
    participant Email as Email
    participant SMS as SMS

    System->>Room: System event via send_event()
    Room->>WS: Rich notification (real-time)
    Room->>Email: Full notification (HTML, threaded)
    Room->>SMS: Text summary (max 1600 chars)

Identity Verification Flow

sequenceDiagram
    participant User as Unknown Sender (SMS)
    participant Room as RoomKit Room
    participant IR as Identity Resolver
    participant Hook as Identity Hook

    User->>Room: Inbound message
    Room->>IR: resolve(message, context)
    IR-->>Room: UNKNOWN

    Room->>Hook: ON_IDENTITY_UNKNOWN
    Hook-->>Room: IdentityHookResult.challenge()
    Note over Hook: Injects verification message

    Room->>User: "Please verify: reply with your DOB"
    Note over Room: Original message blocked

    User->>Room: "1990-01-15"
    Room->>IR: resolve(message, context)
    IR-->>Room: IDENTIFIED (identity resolved)
    Note over Room: Message proceeds through pipeline

Dynamic Channel Management

sequenceDiagram
    participant Admin as Admin
    participant Room as RoomKit Room
    participant AI as AI Channel
    participant SMS as SMS Channel

    Admin->>Room: attach_channel("ai-bot")
    Note over Room: AI starts responding to messages

    Admin->>Room: mute("ai-bot")
    Note over AI: AI still receives events, produces tasks/observations
    Note over AI: But response messages are suppressed

    Admin->>Room: set_access("sms-out", READ_ONLY)
    Note over SMS: SMS can only observe, cannot send

    Admin->>Room: unmute("ai-bot")
    Note over AI: AI resumes responding

    Admin->>Room: detach_channel("sms-out")
    Note over SMS: SMS removed from room

Integration Points

Inbound Message Ingestion

External systems deliver messages to RoomKit via process_inbound():

# From a FastAPI webhook handler
@app.post("/webhook/sms")
async def sms_webhook(request: Request):
    payload = await request.json()
    message = parse_voicemeup_webhook(payload)
    result = await kit.process_inbound(message)
    return {"status": "ok", "blocked": result.blocked}

Direct Event Injection

Send events directly into a room without going through the inbound pipeline:

event = await kit.send_event(
    room_id="room-1",
    channel_id="system-channel",
    content=TextContent(body="System maintenance in 5 minutes"),
)

Webhook Parsers

Built-in webhook parsers for provider-specific payloads:

  • parse_sinch_webhook() -- Sinch SMS webhooks
  • parse_telnyx_webhook() -- Telnyx SMS webhooks
  • parse_twilio_webhook() -- Twilio SMS webhooks (form-encoded)
  • parse_voicemeup_webhook() -- VoiceMeUp SMS webhooks
  • parse_messenger_webhook() -- Facebook Messenger webhooks
  • parse_http_webhook() -- Generic HTTP webhook payloads
  • extract_sms_meta() -- Normalized metadata extraction for any SMS provider

Custom Storage Backends

Implement ConversationStore for any persistence layer:

class PostgresStore(ConversationStore):
    async def create_room(self, room: Room) -> Room:
        async with self.pool.acquire() as conn:
            await conn.execute("INSERT INTO rooms ...", room.model_dump())
        return room
    # ... implement remaining 29 abstract methods

Custom AI Providers

Implement AIProvider for any AI service:

class CustomAIProvider(AIProvider):
    async def generate(self, context: AIContext) -> AIResponse:
        response = await my_ai_client.chat(context.messages)
        return AIResponse(content=response.text, usage={"tokens": response.usage})

Custom Identity Resolvers

Implement IdentityResolver for any user directory:

class CRMIdentityResolver(IdentityResolver):
    async def resolve(self, message: InboundMessage, context: RoomContext) -> IdentityResult:
        user = await crm.lookup_by_phone(message.sender_id)
        if user:
            return IdentityResult(
                status=IdentificationStatus.IDENTIFIED,
                identity=Identity(id=user.id, display_name=user.name),
            )
        return IdentityResult(status=IdentificationStatus.UNKNOWN)

Custom Inbound Routers

Override the default room routing strategy:

class TenantRouter(InboundRoomRouter):
    async def route(self, channel_id, channel_type, participant_id) -> str | None:
        # Route to room based on tenant-specific logic
        return await self.lookup_tenant_room(participant_id)

kit = RoomKit(inbound_router=TenantRouter())

Current Limitations

  • Single-process only (default) -- InMemoryLockManager and InMemoryRealtime use asyncio primitives; distributed deployments require custom RoomLockManager and RealtimeBackend implementations
  • In-memory storage only (default) -- No persistent store ships with the library; production use requires a custom ConversationStore
  • No built-in HTTP server -- RoomKit is a library, not a server; webhook endpoints must be provided by the host application
  • No file/media storage -- MediaContent stores URLs; actual file hosting must be handled externally
  • No end-to-end encryption -- Content is available in plaintext within the pipeline; encryption must be handled at the transport level
  • Limited WhatsApp support -- Mock provider only; no production WhatsApp Business API integration
  • No voice channel -- ChannelType.VOICE is defined but not implemented
  • No push notification channel -- ChannelType.PUSH is defined but not implemented
  • No pause_room() method -- Rooms can only be paused via timers (check_room_timers), not directly

Potential Enhancements

  • Persistent storage backends -- PostgreSQL, Redis, or MongoDB ConversationStore implementations
  • Distributed locking -- Redis-based RoomLockManager for multi-process deployments
  • Distributed realtime -- Redis pub/sub or NATS-based RealtimeBackend for multi-process deployments
  • Event streaming -- Kafka or Redis Streams integration for cross-service event distribution
  • OpenTelemetry integration -- Built-in tracing and metrics via the FrameworkEvent system
  • Voice channel -- WebRTC or telephony integration via Twilio Voice or similar
  • Push notifications -- Firebase Cloud Messaging or APNs provider
  • WhatsApp Business API -- Full provider implementation with template management
  • Message search -- Full-text search across room events
  • File storage abstraction -- S3/GCS integration for media content hosting
  • Admin dashboard -- Web UI for room management and monitoring
  • Rate limit queuing -- Queue-and-drain instead of drop when rate limited
  • Direct pause/resume -- Explicit pause_room() and resume_room() methods