Technical Documentation¶
Technology Stack¶
| Category | Technology | Version |
|---|---|---|
| Language | Python | >= 3.12 |
| Data Validation | Pydantic | >= 2.9 |
| HTTP Client | httpx | >= 0.27 (optional) |
| AI - Anthropic | anthropic SDK | >= 0.30 (optional) |
| AI - OpenAI | openai SDK | >= 1.30 (optional) |
| AI - Gemini | google-genai | >= 1.0.0 (optional) |
| AI - Mistral | mistralai | >= 1.0 (optional) |
| SMS - Twilio | twilio | >= 9.0 (optional) |
| Phone Validation | phonenumbers | >= 8.13 (optional) |
| Crypto (Telnyx) | pynacl | >= 1.5 (optional) |
| Voice - FastRTC | fastrtc + numpy | latest (optional) |
| Voice - STT | Deepgram (via httpx + websockets) | >= 0.27 / >= 13.0 (optional) |
| Voice - TTS | ElevenLabs (via httpx + websockets) | >= 0.27 / >= 13.0 (optional) |
| Build System | Hatchling | latest |
| Package Manager | uv | latest |
| Test Framework | pytest | >= 8.0 |
| Async Testing | pytest-asyncio | >= 0.24 |
| Coverage | pytest-cov | >= 5.0 |
| Type Checker | mypy | >= 1.11 (strict mode) |
| Linter/Formatter | Ruff | >= 0.6 |
| Git Hooks | pre-commit | >= 3.8 |
| Documentation | MkDocs + Material | >= 1.6 / >= 9.5 |
| API Docs | mkdocstrings[python] | >= 0.27 |
| CI/CD | GitHub Actions | -- |
Project Structure¶
roomkit/
├── .github/
│ └── workflows/
│ └── ci.yml # GitHub Actions CI pipeline
├── docs/ # MkDocs documentation source
│ ├── api/ # Auto-generated API reference
│ ├── index.md # Documentation home
│ ├── architecture.md # Architecture overview
│ ├── technical.md # This file
│ ├── features.md # Feature documentation
│ ├── ai-integration.md # AI integration guide
│ ├── mcp.md # MCP integration
│ ├── roomkit-rfc.md # RFC v11 (design specification)
│ └── cpaas-comparison.md # CPaaS comparison (Twilio vs RoomKit)
├── examples/ # Executable example scripts
│ ├── quickstart.py # WebSocket + AI quickstart
│ ├── anthropic_ai.py # Anthropic Claude integration
│ ├── openai_ai.py # OpenAI integration
│ ├── http_webhook.py # Generic HTTP webhook
│ ├── voicemeup_sms.py # VoiceMeUp SMS provider
│ ├── elasticemail.py # ElasticEmail provider
│ └── facebook_messenger.py # Facebook Messenger integration
├── src/roomkit/ # Main library source
│ ├── __init__.py # Public API (~150 exports via __all__)
│ ├── _version.py # Version string (0.1.1)
│ ├── py.typed # PEP 561 type marker
│ ├── ai_docs.py # AI documentation helpers
│ ├── core/ # Framework internals
│ │ ├── framework.py # RoomKit orchestrator (mixin composition)
│ │ ├── _inbound.py # InboundMixin (message pipeline)
│ │ ├── _channel_ops.py # ChannelOpsMixin (channel management)
│ │ ├── _room_lifecycle.py # RoomLifecycleMixin (room CRUD, timers)
│ │ ├── _helpers.py # HelpersMixin (shared utilities)
│ │ ├── hooks.py # HookEngine (sync/async pipelines)
│ │ ├── event_router.py # EventRouter (broadcast + delivery)
│ │ ├── inbound_router.py # InboundRoomRouter (message routing)
│ │ ├── transcoder.py # DefaultContentTranscoder
│ │ ├── circuit_breaker.py # CircuitBreaker (fault isolation)
│ │ ├── rate_limiter.py # TokenBucketRateLimiter
│ │ ├── retry.py # retry_with_backoff()
│ │ └── locks.py # RoomLockManager + InMemoryLockManager
│ ├── channels/ # Channel implementations
│ │ ├── __init__.py # Factory functions (SMSChannel, EmailChannel, RCSChannel, etc.)
│ │ ├── base.py # Channel ABC
│ │ ├── transport.py # TransportChannel (unified generic channel)
│ │ ├── websocket.py # WebSocketChannel
│ │ ├── ai.py # AIChannel (intelligence layer)
│ │ └── voice.py # VoiceChannel (real-time audio with STT/TTS)
│ ├── models/ # Pydantic data models
│ │ ├── enums.py # All StrEnum types (16 enums)
│ │ ├── event.py # RoomEvent, EventContent (9 content types)
│ │ ├── room.py # Room, RoomTimers
│ │ ├── channel.py # ChannelBinding, ChannelCapabilities, ChannelOutput, RateLimit, RetryPolicy
│ │ ├── hook.py # HookResult, InjectedEvent
│ │ ├── identity.py # Identity, IdentityResult, IdentityHookResult
│ │ ├── participant.py # Participant
│ │ ├── delivery.py # InboundMessage, InboundResult, ProviderResult, DeliveryResult, DeliveryStatus
│ │ ├── context.py # RoomContext
│ │ ├── task.py # Task, Observation
│ │ └── framework_event.py # FrameworkEvent
│ ├── providers/ # External service providers
│ │ ├── ai/ # AIProvider ABC + MockAIProvider
│ │ ├── anthropic/ # AnthropicAIProvider + AnthropicConfig
│ │ ├── openai/ # OpenAIAIProvider + OpenAIConfig
│ │ ├── gemini/ # GeminiAIProvider + GeminiConfig
│ │ ├── mistral/ # MistralAIProvider + MistralConfig
│ │ ├── sms/ # SMSProvider ABC + MockSMSProvider + utilities
│ │ ├── voicemeup/ # VoiceMeUpSMSProvider + VoiceMeUpConfig + MMS aggregation
│ │ ├── twilio/ # TwilioSMSProvider + TwilioRCSProvider + configs
│ │ ├── telnyx/ # TelnyxSMSProvider + TelnyxRCSProvider + configs
│ │ ├── sinch/ # SinchSMSProvider + SinchConfig
│ │ ├── rcs/ # RCSProvider ABC + MockRCSProvider
│ │ ├── email/ # EmailProvider ABC + MockEmailProvider
│ │ ├── elasticemail/ # ElasticEmailProvider + ElasticEmailConfig
│ │ ├── sendgrid/ # SendGridConfig (scaffolded)
│ │ ├── messenger/ # MessengerProvider ABC + FacebookMessengerProvider + MockMessengerProvider
│ │ ├── whatsapp/ # WhatsAppProvider ABC + MockWhatsAppProvider
│ │ └── http/ # HTTPProvider ABC + WebhookHTTPProvider + MockHTTPProvider
│ ├── identity/ # Identity resolution
│ │ ├── base.py # IdentityResolver ABC
│ │ └── mock.py # MockIdentityResolver
│ ├── realtime/ # Ephemeral events
│ │ ├── base.py # RealtimeBackend ABC
│ │ └── memory.py # InMemoryRealtime
│ ├── voice/ # Voice/audio support
│ │ ├── __init__.py # Lazy loaders for optional providers
│ │ ├── base.py # Shared types: VoiceSession, AudioChunk, callbacks
│ │ ├── events.py # Voice event types (BargeIn, VAD, etc.)
│ │ ├── stt/ # Speech-to-text providers
│ │ │ ├── base.py # STTProvider ABC
│ │ │ ├── deepgram.py # DeepgramSTTProvider + DeepgramConfig
│ │ │ └── mock.py # MockSTTProvider
│ │ ├── tts/ # Text-to-speech providers
│ │ │ ├── base.py # TTSProvider ABC
│ │ │ ├── elevenlabs.py # ElevenLabsTTSProvider + ElevenLabsConfig
│ │ │ └── mock.py # MockTTSProvider
│ │ ├── backends/ # Voice transport backends
│ │ │ ├── base.py # VoiceBackend ABC
│ │ │ ├── fastrtc.py # FastRTCVoiceBackend + mount_fastrtc_voice
│ │ │ └── mock.py # MockVoiceBackend + MockVoiceCall
│ │ └── realtime/ # Realtime voice (speech-to-speech)
│ │ ├── base.py # RealtimeSession, RealtimeSessionState
│ │ ├── provider.py # RealtimeVoiceProvider ABC
│ │ ├── transport.py # RealtimeAudioTransport ABC
│ │ ├── ws_transport.py # WebSocketRealtimeTransport
│ │ ├── fastrtc_transport.py # FastRTCRealtimeTransport + mount_fastrtc_realtime
│ │ ├── events.py # Realtime voice events
│ │ └── mock.py # MockRealtimeProvider + MockRealtimeTransport
│ └── store/ # Persistence layer
│ ├── base.py # ConversationStore ABC (30 abstract methods)
│ └── memory.py # InMemoryStore implementation
├── tests/ # Test suite
│ ├── conftest.py # Shared fixtures
│ ├── test_channels/ # Channel-specific tests (7 files)
│ ├── test_providers/ # Provider-specific tests (15+ files)
│ ├── test_integration/ # End-to-end integration tests (8+ files)
│ └── test_*.py # Unit tests (~35 files)
├── site/ # Built documentation output
├── pyproject.toml # Project configuration
├── Makefile # Development commands
├── mkdocs.yml # MkDocs configuration
├── uv.lock # Dependency lock file
├── .pre-commit-config.yaml # Pre-commit hooks
├── AGENTS.md # AI coding assistant context
├── llms.txt # LLM context document
├── README.md # Project README
├── CONTRIBUTING.md # Contribution guidelines
└── LICENSE # MIT license
Data Models¶
All models inherit from pydantic.BaseModel with Pydantic v2 validation.
Room¶
class Room(BaseModel):
id: str
organization_id: str | None
status: RoomStatus # ACTIVE | PAUSED | CLOSED | ARCHIVED
created_at: datetime
updated_at: datetime
closed_at: datetime | None
timers: RoomTimers
metadata: dict[str, Any]
event_count: int
latest_index: int
RoomEvent¶
class RoomEvent(BaseModel):
id: str # UUID hex
room_id: str
type: EventType # MESSAGE, SYSTEM, TYPING, etc.
source: EventSource
content: EventContent # Discriminated union (see below)
status: EventStatus # PENDING | DELIVERED | READ | FAILED | BLOCKED
blocked_by: str | None
visibility: str # "all" or channel-specific filter
index: int
chain_depth: int # Reentry depth for loop prevention
parent_event_id: str | None
correlation_id: str | None
idempotency_key: str | None
created_at: datetime
metadata: dict[str, Any]
channel_data: ChannelData
delivery_results: dict[str, Any]
Content Types (Discriminated Union)¶
EventContent = Annotated[
TextContent # Plain text with optional language
| RichContent # HTML/Markdown with buttons, cards, quick replies
| MediaContent # File attachment (URL, MIME type, caption)
| LocationContent # Lat/lng with label and address
| AudioContent # Audio file with optional transcript
| VideoContent # Video file with optional thumbnail
| CompositeContent # Multi-part combining multiple types (max depth 5)
| SystemContent # System messages with code and data
| TemplateContent, # Pre-approved templates (WhatsApp Business)
Field(discriminator="type"),
]
ChannelBinding¶
class ChannelBinding(BaseModel):
channel_id: str
room_id: str
channel_type: ChannelType
category: ChannelCategory # TRANSPORT | INTELLIGENCE
direction: ChannelDirection # INBOUND | OUTBOUND | BIDIRECTIONAL
access: Access # READ_WRITE | READ_ONLY | WRITE_ONLY | NONE
muted: bool
visibility: str
participant_id: str | None
last_read_index: int | None
attached_at: datetime
capabilities: ChannelCapabilities
rate_limit: RateLimit | None
retry_policy: RetryPolicy | None
metadata: dict[str, Any]
ChannelCapabilities¶
class ChannelCapabilities(BaseModel):
media_types: list[ChannelMediaType]
max_length: int | None
supports_threading: bool
supports_reactions: bool
supports_read_receipts: bool
supports_typing: bool
supports_templates: bool
supports_rich_text: bool
supports_buttons: bool
max_buttons: int | None
supports_cards: bool
supports_quick_replies: bool
supports_media: bool
supported_media_types: list[str]
max_media_size_bytes: int | None
supports_audio: bool
supports_video: bool
delivery_mode: DeliveryMode # BROADCAST | DIRECT | ROUND_ROBIN
DeliveryStatus¶
class DeliveryStatus(BaseModel):
"""Status update for an outbound message from a provider webhook."""
provider: str # e.g., "telnyx", "twilio"
message_id: str # Provider's unique message identifier
status: str # e.g., "sent", "delivered", "failed"
recipient: str # Phone number/address sent to
sender: str # Phone number/address sent from
error_code: str | None # Provider-specific error code
error_message: str | None # Human-readable error message
timestamp: str | None # When the status was reported
raw: dict[str, Any] # Original webhook payload
AI Models¶
class AITool(BaseModel):
"""Tool definition for function calling."""
name: str
description: str
parameters: dict[str, Any]
class AIToolCall(BaseModel):
"""A tool call from the AI response."""
id: str
name: str
arguments: dict[str, Any]
class AIContext(BaseModel):
"""Context passed to AI provider for generation."""
messages: list[AIMessage]
system_prompt: str | None
temperature: float
max_tokens: int
tools: list[AITool] # Function calling definitions
room: RoomContext | None
target_capabilities: ChannelCapabilities | None
target_media_types: list[ChannelMediaType]
class AIResponse(BaseModel):
"""Response from an AI provider."""
content: str
finish_reason: str | None
usage: dict[str, int]
tool_calls: list[AIToolCall] # Function calls to execute
tasks: list[Task]
observations: list[Observation]
Voice Models¶
class VoiceSessionState(StrEnum):
CONNECTING, ACTIVE, PAUSED, ENDED
class VoiceCapability(Flag):
NONE = 0
INTERRUPTION = auto() # Can cancel audio playback
PARTIAL_STT = auto() # Provides partial transcription
VAD_SILENCE = auto() # Emits silence detection events
VAD_AUDIO_LEVEL = auto() # Emits audio level events
BARGE_IN = auto() # Detects user interrupts TTS
@dataclass
class VoiceSession:
id: str
room_id: str
participant_id: str
channel_id: str
state: VoiceSessionState
created_at: datetime
metadata: dict[str, Any] # Contains input/output sample rates
@dataclass
class AudioChunk:
data: bytes
sample_rate: int = 16000
channels: int = 1
format: str = "pcm_s16le"
timestamp_ms: int | None = None
is_final: bool = False
@dataclass(frozen=True)
class BargeInEvent:
"""User started speaking while TTS was playing."""
session: VoiceSession
interrupted_text: str
audio_position_ms: int
timestamp: datetime
@dataclass(frozen=True)
class TTSCancelledEvent:
session: VoiceSession
reason: Literal["barge_in", "explicit", "disconnect", "error"]
text: str
audio_position_ms: int
timestamp: datetime
Enums¶
| Enum | Values |
|---|---|
ChannelType |
sms, mms, rcs, email, whatsapp, websocket, ai, voice, push, messenger, webhook, system |
ChannelCategory |
transport, intelligence |
ChannelDirection |
inbound, outbound, bidirectional |
ChannelMediaType |
text, rich, media, audio, video, location, template |
EventType |
message, system, typing, read_receipt, delivery_receipt, presence, reaction, edit, delete, participant_joined, participant_left, participant_identified, channel_attached, channel_detached, channel_muted, channel_unmuted, channel_updated, task_created, observation |
EventStatus |
pending, delivered, read, failed, blocked |
RoomStatus |
active, paused, closed, archived |
Access |
read_write, read_only, write_only, none |
IdentificationStatus |
identified, pending, ambiguous, unknown, challenge_sent, rejected |
ParticipantRole |
owner, agent, member, observer, bot |
ParticipantStatus |
active, inactive, left, banned |
TaskStatus |
pending, in_progress, completed, failed, cancelled |
DeliveryMode |
broadcast, direct, round_robin |
HookTrigger |
before_broadcast, after_broadcast, on_channel_attached, on_channel_detached, on_channel_muted, on_channel_unmuted, on_room_created, on_room_paused, on_room_closed, on_identity_ambiguous, on_identity_unknown, on_participant_identified, on_task_created, on_error, on_delivery_status, on_speech_start, on_speech_end, on_transcription, before_tts, after_tts, on_barge_in, on_tts_cancelled, on_partial_transcription, on_vad_silence, on_vad_audio_level |
HookExecution |
sync, async |
API Design¶
Channel ABC¶
Every channel implements this interface:
class Channel(ABC):
channel_type: ChannelType
category: ChannelCategory
direction: ChannelDirection
@abstractmethod
async def handle_inbound(self, message: InboundMessage, context: RoomContext) -> RoomEvent:
"""Convert an inbound message into a RoomEvent."""
@abstractmethod
async def deliver(self, event: RoomEvent, binding: ChannelBinding, context: RoomContext) -> ChannelOutput:
"""Push an event to the external system."""
async def on_event(self, event: RoomEvent, binding: ChannelBinding, context: RoomContext) -> ChannelOutput:
"""React to an event (default: no-op for transport channels)."""
def capabilities(self) -> ChannelCapabilities:
"""Declare supported media types and features."""
async def close(self) -> None:
"""Close the channel and its provider."""
Voice ABCs¶
class VoiceBackend(ABC):
"""Audio transport: WebSocket/WebRTC connections, VAD, session management."""
@property
@abstractmethod
def name(self) -> str: ...
@property
def capabilities(self) -> VoiceCapability: ...
@abstractmethod
async def connect(self, room_id, participant_id, channel_id, *, metadata=None) -> VoiceSession: ...
@abstractmethod
async def disconnect(self, session: VoiceSession) -> None: ...
@abstractmethod
def on_speech_start(self, callback: SpeechStartCallback) -> None: ...
@abstractmethod
def on_speech_end(self, callback: SpeechEndCallback) -> None: ...
@abstractmethod
async def send_audio(self, session, audio: bytes | AsyncIterator[AudioChunk]) -> None: ...
async def send_transcription(self, session, text, role="user") -> None: ...
async def cancel_audio(self, session) -> bool: ...
def is_playing(self, session) -> bool: ...
# Enhanced callbacks (opt-in via capabilities)
def on_partial_transcription(self, callback) -> None: ...
def on_vad_silence(self, callback) -> None: ...
def on_vad_audio_level(self, callback) -> None: ...
def on_barge_in(self, callback) -> None: ...
class STTProvider(ABC):
"""Speech-to-text: transcribe audio to text."""
@abstractmethod
async def transcribe(self, audio: AudioContent | AudioChunk) -> str: ...
async def transcribe_stream(self, audio_stream) -> AsyncIterator[TranscriptionResult]: ...
async def close(self) -> None: ...
class TTSProvider(ABC):
"""Text-to-speech: synthesize text to audio."""
@abstractmethod
async def synthesize(self, text, *, voice=None) -> AudioContent: ...
async def synthesize_stream(self, text, *, voice=None) -> AsyncIterator[AudioChunk]: ...
async def close(self) -> None: ...
RoomKit Public API¶
The RoomKit orchestrator exposes these public methods, organized by mixin:
class RoomKit(InboundMixin, ChannelOpsMixin, RoomLifecycleMixin, HelpersMixin):
def __init__(
self,
store=None, # ConversationStore (default: InMemoryStore)
identity_resolver=None, # IdentityResolver
identity_channel_types=None, # Restrict identity resolution to channel types
inbound_router=None, # InboundRoomRouter
lock_manager=None, # RoomLockManager (default: InMemoryLockManager)
realtime=None, # RealtimeBackend (default: InMemoryRealtime)
max_chain_depth=5, # Max AI reentry depth
identity_timeout=10.0, # Timeout for identity resolution (seconds)
process_timeout=30.0, # Timeout for locked processing (seconds)
)
# --- Properties ---
@property
def store(self) -> ConversationStore
@property
def hook_engine(self) -> HookEngine
@property
def realtime(self) -> RealtimeBackend
# --- Inbound processing (InboundMixin) ---
async def process_inbound(self, message: InboundMessage) -> InboundResult
# --- Webhook handling ---
async def process_webhook(self, meta: WebhookMeta, channel_id: str) -> None
async def process_delivery_status(self, status: DeliveryStatus) -> None
# --- Channel management (ChannelOpsMixin) ---
def register_channel(self, channel: Channel) -> None
async def attach_channel(self, room_id, channel_id, channel_type=None,
category=TRANSPORT, access=READ_WRITE,
visibility="all", **kwargs) -> ChannelBinding
async def detach_channel(self, room_id, channel_id) -> bool
async def mute(self, room_id, channel_id) -> ChannelBinding
async def unmute(self, room_id, channel_id) -> ChannelBinding
async def set_visibility(self, room_id, channel_id, visibility) -> ChannelBinding
async def set_access(self, room_id, channel_id, access) -> ChannelBinding
async def update_binding_metadata(self, room_id, channel_id, metadata) -> ChannelBinding
def get_channel(self, channel_id) -> Channel | None
def list_channels(self) -> list[Channel]
async def get_binding(self, room_id, channel_id) -> ChannelBinding
async def list_bindings(self, room_id) -> list[ChannelBinding]
# --- Room lifecycle (RoomLifecycleMixin) ---
async def create_room(self, room_id=None, metadata=None) -> Room
async def get_room(self, room_id) -> Room
async def close_room(self, room_id) -> Room
async def check_room_timers(self, room_id) -> Room
async def check_all_timers(self) -> list[Room]
async def update_room_metadata(self, room_id, metadata) -> Room
async def ensure_participant(self, room_id, channel_id, participant_id,
display_name=None) -> Participant
async def resolve_participant(self, room_id, participant_id, identity_id,
resolved_by="manual") -> Participant
# --- Direct send ---
async def send_event(self, room_id, channel_id, content,
event_type=MESSAGE, chain_depth=0) -> RoomEvent
# --- Queries ---
async def get_timeline(self, room_id, offset=0, limit=50,
visibility_filter=None) -> list[RoomEvent]
async def list_tasks(self, room_id, status=None) -> list[Task]
async def list_observations(self, room_id) -> list[Observation]
# --- WebSocket lifecycle ---
async def connect_websocket(self, channel_id, connection_id, send_fn) -> None
async def disconnect_websocket(self, channel_id, connection_id) -> None
# --- Read tracking ---
async def mark_read(self, room_id, channel_id, event_id) -> None
async def mark_all_read(self, room_id, channel_id) -> None
# --- Realtime (ephemeral events) ---
async def publish_typing(self, room_id, user_id, is_typing=True) -> None
async def publish_presence(self, room_id, user_id, status) -> None
async def publish_read_receipt(self, room_id, user_id, event_id) -> None
async def subscribe_room(self, room_id, callback) -> str
async def unsubscribe_room(self, subscription_id) -> bool
# --- Hook registration ---
def hook(self, trigger, execution=SYNC, priority=0, name="", timeout=30.0,
channel_types=None, channel_ids=None, directions=None) -> decorator
def on(self, event_type: str) -> decorator # Framework event handler
def identity_hook(self, trigger, channel_types=None, channel_ids=None,
directions=None) -> decorator # Identity resolution hook
def on_delivery_status(self, fn) -> decorator # Delivery status handler
def add_room_hook(self, room_id, trigger, execution, fn, priority=0, name="") -> None
def remove_room_hook(self, room_id, name) -> bool
# --- Voice ---
async def connect_voice(self, room_id, participant_id, channel_id, metadata=None) -> VoiceSession
async def disconnect_voice(self, session: VoiceSession) -> None
async def transcribe(self, audio: AudioContent) -> str
async def synthesize(self, text: str, voice=None) -> AudioContent
# --- Context manager ---
async def close(self) -> None
async def __aenter__(self) -> RoomKit
async def __aexit__(self, ...) -> None
ConversationStore ABC¶
The storage interface defines 30 abstract methods across 8 categories:
| Category | Methods |
|---|---|
| Rooms | create_room, get_room, update_room, delete_room, list_rooms, find_rooms, find_latest_room, find_room_id_by_channel |
| Events | add_event, get_event, list_events, check_idempotency, get_event_count |
| Bindings | add_binding, get_binding, update_binding, remove_binding, list_bindings |
| Participants | add_participant, get_participant, update_participant, list_participants |
| Identities | create_identity, get_identity, resolve_identity, link_address |
| Tasks | add_task, get_task, list_tasks, update_task |
| Observations | add_observation, list_observations |
| Read Tracking | mark_read, mark_all_read, get_unread_count |
Configuration Management¶
Provider Configuration¶
All provider configs use Pydantic models with SecretStr for sensitive values:
class AnthropicConfig(BaseModel):
api_key: SecretStr
model: str = "claude-sonnet-4-20250514"
max_tokens: int = 1024
class OpenAIConfig(BaseModel):
api_key: SecretStr
model: str = "gpt-4o"
max_tokens: int = 1024
class GeminiConfig(BaseModel):
api_key: SecretStr
model: str = "gemini-2.0-flash"
max_tokens: int = 1024
temperature: float = 1.0
class TelnyxConfig(BaseModel):
api_key: SecretStr
from_number: str
messaging_profile_id: str | None = None
timeout: float = 10.0
class TelnyxRCSConfig(BaseModel):
api_key: SecretStr
agent_id: str
messaging_profile_id: str | None = None
timeout: float = 10.0
class TwilioConfig(BaseModel):
account_sid: str
auth_token: SecretStr
from_number: str
class VoiceMeUpConfig(BaseModel):
username: str
auth_token: SecretStr
from_number: str
base_url: str = "https://api.voicemeup.com/"
timeout: float = 10.0
class ElasticEmailConfig(BaseModel):
api_key: SecretStr
from_email: str
from_name: str | None = None
RoomKit Initialization¶
The orchestrator accepts pluggable components via constructor arguments:
kit = RoomKit(
store=InMemoryStore(), # Default
identity_resolver=MyResolver(), # Optional
identity_channel_types={ChannelType.SMS}, # Restrict identity to SMS only
inbound_router=MyRouter(), # Default: DefaultInboundRoomRouter
lock_manager=InMemoryLockManager(), # Default
realtime=InMemoryRealtime(), # Default
max_chain_depth=5, # Default
identity_timeout=10.0, # Default (seconds)
process_timeout=30.0, # Default (seconds)
)
Optional Dependencies¶
Dependencies are lazily loaded. Each provider group has its own optional extra:
[project.optional-dependencies]
httpx = ["httpx>=0.27"]
anthropic = ["anthropic>=0.30"]
openai = ["openai>=1.30"]
gemini = ["google-genai>=1.0.0"]
mistral = ["mistralai>=1.0"]
twilio = ["twilio>=9.0"]
phonenumbers = ["phonenumbers>=8.13"]
pynacl = ["pynacl>=1.5"]
fastrtc = ["fastrtc", "numpy"]
websocket = ["websockets>=13.0"]
sse = ["httpx>=0.27", "httpx-sse>=0.4"]
providers = ["roomkit[httpx,anthropic,openai,gemini,twilio]"]
all = ["roomkit[providers,phonenumbers,pynacl]"]
Build and Deployment¶
Build System¶
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.version]
path = "src/roomkit/_version.py"
[tool.hatch.build.targets.wheel]
packages = ["src/roomkit"]
artifacts = ["*.md", "*.txt"]
[tool.hatch.build.targets.wheel.force-include]
"AGENTS.md" = "roomkit/AGENTS.md"
"llms.txt" = "roomkit/llms.txt"
Development Commands (Makefile)¶
| Command | Action |
|---|---|
make install |
uv sync --extra dev |
make lint |
uv run ruff check src/ tests/ |
make format |
uv run ruff format src/ tests/ |
make typecheck |
uv run mypy src/roomkit/ |
make test |
uv run pytest |
make coverage |
uv run pytest --cov=roomkit --cov-report=term-missing |
make all |
lint + typecheck + test |
make clean |
Remove build artifacts, caches |
CI/CD Pipeline (GitHub Actions)¶
The CI pipeline runs on push to main and on pull requests:
jobs:
lint: # Ruff check + format check (Python 3.12)
typecheck: # mypy strict (Python 3.12)
test: # pytest with coverage (Python 3.12 + 3.13 matrix)
Coverage reports are uploaded as artifacts for the Python 3.12 run.
Testing Strategy¶
Test Organization¶
| Category | Path | Scope |
|---|---|---|
| Unit | tests/test_*.py |
Individual components (models, enums, hooks, store, locks, circuit breakers, retry, rate limiting, public API) |
| Channel | tests/test_channels/ |
Channel implementations (websocket, sms, email, messenger, whatsapp, ai, http) |
| Provider | tests/test_providers/ |
Provider integrations (anthropic, openai, gemini, voicemeup, twilio, telnyx, sinch, rcs, elasticemail, http, messenger) |
| Voice | tests/test_voice*.py, tests/test_fastrtc_realtime_transport.py |
Voice subsystem: STT/TTS mocks, VoiceBackend, VoiceChannel pipeline, FastRTCVoiceBackend, mu-law encoding, FastRTCRealtimeTransport |
| Integration | tests/test_integration/ |
Cross-component workflows (quickstart, AI assistant, human-AI, cross-channel, dynamic channels, observer, identity resolution, AI chain depth) |
Test Configuration¶
[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto" # All async tests run automatically
filterwarnings = ["error"] # All warnings treated as errors
[tool.coverage.report]
show_missing = true
fail_under = 90 # 90% minimum coverage required
Testing Patterns¶
- Mock providers ship with the library (
MockAIProvider,MockSMSProvider,MockEmailProvider,MockHTTPProvider,MockMessengerProvider,MockWhatsAppProvider,MockRCSProvider,MockIdentityResolver,MockSTTProvider,MockTTSProvider,MockVoiceBackend) - Shared fixtures in
conftest.pyprovideInMemoryStore, pre-built rooms, and event/binding factory helpers (make_event(),make_binding()) - All tests are async using
pytest-asynciowithasyncio_mode="auto" - Integration tests cover multi-channel flows, AI chain depth limiting, identity resolution, cross-channel delivery, and dynamic channel attachment
Key Test Scenarios¶
- Room lifecycle (create, pause, close with timers)
- Hook pipelines (sync block/allow/modify, async side effects, priority ordering, timeout handling, filtering)
- Cross-channel message routing and content transcoding
- AI channel reentry with chain depth limits
- AI per-room configuration and function calling
- Identity resolution with ambiguity handling and challenge flows
- Concurrent inbound processing with per-room locks
- Circuit breaker state transitions (closed/open/half-open)
- Rate limiter token refill behavior and wait semantics
- Idempotency key deduplication (checked inside room lock)
- Muted channel behavior (events received, responses suppressed, tasks/observations preserved)
- Visibility filtering (all, none, transport, intelligence, specific channel IDs)
- Framework event emission for observability
- RCS provider functionality and SMS fallback
- MMS aggregation (VoiceMeUp split webhooks)
- Delivery status webhook handling
- Voice pipeline: speech end → STT → hooks → inbound → AI → TTS → audio
- VoiceBackend session lifecycle and callback registration
- FastRTCVoiceBackend: session management, mu-law encoding, WebSocket audio send
- FastRTCRealtimeTransport: session/handler mapping, audio queueing, DataChannel messaging, disconnect cleanup
- Barge-in detection and TTS cancellation
- Voice capability flags and enhanced callbacks
Code Quality Tools¶
Ruff (Linter + Formatter)¶
[tool.ruff]
target-version = "py312"
line-length = 99
src = ["src", "tests"]
[tool.ruff.lint]
select = ["E", "F", "I", "N", "UP", "B", "SIM"]
Rules enabled: - E -- pycodestyle errors - F -- pyflakes - I -- isort (import sorting) - N -- pep8-naming - UP -- pyupgrade - B -- flake8-bugbear - SIM -- flake8-simplify
mypy (Type Checker)¶
[tool.mypy]
python_version = "3.12"
strict = true
warn_return_any = true
warn_unused_configs = true
[[tool.mypy.overrides]]
module = ["google", "google.genai", "google.genai.*"]
ignore_missing_imports = true
The library is PEP 561 compliant (py.typed marker) and fully type-safe under strict mypy checking.
Pre-commit Hooks¶
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
hooks:
- id: ruff # Lint check with --fix
- id: ruff-format # Format check
- repo: https://github.com/pre-commit/mirrors-mypy
hooks:
- id: mypy # Strict type check
Development Environment Setup¶
# Clone the repository
git clone https://github.com/roomkit-live/roomkit.git
cd roomkit
# Install all dev dependencies (requires uv)
make install
# or: uv sync --extra dev
# Run the full quality gate
make all
# or individually:
make lint # Check linting
make format # Auto-format code
make typecheck # Strict type checking
make test # Run test suite
make coverage # Coverage report (90% threshold)
# Run examples
uv run python examples/quickstart.py
# Build documentation
uv run mkdocs serve # Local dev server
uv run mkdocs build # Build to site/
Key Architectural Patterns¶
Mixin Composition¶
The RoomKit class uses multiple inheritance to separate concerns while maintaining a single public interface:
class RoomKit(InboundMixin, ChannelOpsMixin, RoomLifecycleMixin, HelpersMixin):
"""Central orchestrator tying rooms, channels, hooks, and storage."""
Each mixin declares the shared state it requires via class-level type annotations (e.g., _store: ConversationStore), and the RoomKit.__init__ initializes all shared state.
Data-Driven Transport Channels¶
Instead of subclassing Channel for each transport type, a single TransportChannel class handles all transports via configuration:
def SMSChannel(channel_id, *, provider=None, from_number=None):
return TransportChannel(
channel_id,
ChannelType.SMS,
provider=provider,
capabilities=SMS_CAPABILITIES,
recipient_key="phone_number",
defaults={"from_": from_number},
)
def RCSChannel(channel_id, *, provider=None):
return TransportChannel(
channel_id,
ChannelType.RCS,
provider=provider,
capabilities=RCS_CAPABILITIES,
recipient_key="phone_number",
)
The recipient_key determines which binding metadata field contains the delivery address. defaults with None values are resolved from binding metadata at delivery time.
Discriminated Union Content¶
Event content uses Pydantic's discriminator pattern for type-safe polymorphism without isinstance checks:
EventContent = Annotated[
TextContent | RichContent | MediaContent | ...,
Field(discriminator="type"),
]
Each content type has a type literal field (e.g., type: Literal["text"] = "text").
Side Effects Model¶
Hooks and intelligence channels can produce three types of side effects:
- InjectedEvent -- Synthetic events delivered to specific channels (e.g., challenge messages)
- Task -- Work items with status tracking for human follow-up
- Observation -- Intelligence findings with category and confidence score
All side effects are collected during the pipeline and persisted after broadcast completes.
Per-Room AI Configuration¶
AI channels support per-room configuration via binding metadata:
await kit.attach_channel("room-1", "ai-bot",
category=ChannelCategory.INTELLIGENCE,
metadata={
"system_prompt": "You are a legal assistant.",
"temperature": 0.3,
"max_tokens": 2048,
"tools": [
{
"name": "search_cases",
"description": "Search legal case database",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string"},
"jurisdiction": {"type": "string"},
},
},
},
],
},
)
Known Technical Debt and Recommendations¶
Partially Implemented Providers¶
The following providers have implementations but may need enhancement:
- WhatsApp -- Mock provider only; no production WhatsApp Business API integration
- SendGrid Email --
SendGridConfigexists, provider not yet implemented
Storage Layer¶
- InMemoryStore only -- No persistent storage backend ships with the library. Production deployments need a custom
ConversationStoreimplementation (PostgreSQL, Redis, etc.). - No migration tooling -- Schema changes to the store interface require manual migration handling.
Missing Infrastructure Patterns¶
- No distributed locking --
InMemoryLockManagerusesasyncio.Lock, suitable for single-process deployments only. Multi-process deployments need Redis-based or similar distributed locks. - No distributed realtime --
InMemoryRealtimeis single-process only. Multi-process deployments need Redis pub/sub or similar. - No event bus -- Events are broadcast in-process. Cross-service event distribution would require an external message broker integration.
- No metrics/tracing -- The
FrameworkEventsystem provides observability hooks, but there is no built-in integration with OpenTelemetry or similar tracing frameworks.
Placeholder Channel Types¶
- All source files stay under 500 lines, maintaining readability
- The
__init__.pyexports a large public surface (~150 symbols) -- users can also use sub-module imports (e.g.,from roomkit.channels import SMSChannel) for more granular control - The
ChannelTypeenum includespushandsystemvalues that have no channel implementations yet (voice is now implemented viaVoiceChannel)