RFC: RoomKit — Multi-Channel Conversation Framework¶
Document Info
| Status | Draft - Under Discussion (v11) |
| Author | Sylvain Boily |
| Contributions | TchatNSign |
| Created | 2026-01-27 |
| Last Updated | 2026-01-27 |
1. Vision¶
A Python framework for multi-channel conversations between humans, AI, and programs — across SMS, Email, WhatsApp, HTTP/WebSocket, and any custom channel.
The framework manages Rooms (conversations) where Channels connect. A Channel can be a human on SMS, a human in a browser, an AI agent, or a program that observes and enforces rules. Channels can be added, removed, muted, and reconfigured dynamically. Hooks intercept events for filtering, blocking, or enrichment.
The framework provides primitives. Business logic belongs to the integrator.
Use Cases¶
┌─────────────────────────────────────────────────────────────────────┐
│ Use Case 1: Human ↔ Human (cross-channel) │
│ │
│ [User A: SMS] ←──── Room ────→ [User B: Browser/HTTP] │
│ │
│ A customer on SMS talks with an advisor in a web dashboard. │
│ Each sees messages in their channel. The Room bridges them. │
└─────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│ Use Case 2: Human ↔ AI │
│ │
│ [User: SMS] ←──── Room ────→ [AI: pydantic-ai] │
│ │
│ A customer on SMS talks with an AI agent. │
└─────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│ Use Case 3: Human ↔ Human + AI assistant │
│ │
│ [User: SMS] ←──── Room ────→ [Advisor: Browser] │
│ ↑ │
│ └──→ [AI: suggestions visible to advisor] │
│ │
│ Human conversation. AI is added to help the advisor. │
│ AI writes with visibility restricted to agent channels only. │
└─────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│ Use Case 4: Human ↔ Human + Observer (rules/compliance) │
│ │
│ [User: SMS] ←──── Room ────→ [Advisor: Browser] │
│ ↑ │
│ └──→ [Observer: blocks sensitive info] │
│ │
│ A sync hook watches events. If someone sends a SIN, the hook │
│ BLOCKS the message before it reaches anyone. The observer channel │
│ (read-only) can also produce tasks and observations as side effects.│
└─────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│ Use Case 5: Dynamic channel management │
│ │
│ [User: SMS] ←── Room ──→ [AI: read_write, visibility=all] │
│ │ │
│ │ (AI can't help → escalate) │
│ │ │
│ [User: SMS] ←── Room ──→ [AI: muted] + [Advisor: Browser] │
│ │ │
│ │ (Advisor wants AI help) │
│ │ │
│ [User: SMS] ←── Room ──→ [AI: visibility=agents] + [Advisor] │
│ │
│ Channels are attached/detached/muted/reconfigured dynamically. │
│ The framework provides the primitives. Business logic decides when. │
└─────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│ Use Case 6: Sensitivity scanning — block + targeted responses │
│ │
│ [Customer: SMS] ←── Room ──→ [Advisor: WebSocket] │
│ ↑ │
│ sync hook: sensitivity_scanner │
│ │
│ Customer sends SIN (Social Insurance Number) via SMS. │
│ A sync hook detects the sensitive data BEFORE broadcast. │
│ The hook BLOCKS the original message and INJECTS: │
│ → to SMS (customer): "Your message was blocked — SIN not allowed" │
│ → to WebSocket (advisor): "Customer tried to send sensitive data" │
│ The original message is stored in timeline with status=BLOCKED │
│ (audit trail) but never delivered to the advisor. │
│ An observation is produced for compliance logging. │
└─────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│ Use Case 7: Ambiguous identity — shared phone number (family) │
│ │
│ [SMS: +15551234567] ←── Room ──→ [Advisor: WebSocket] │
│ │ │
│ ├── Marie Tremblay (mother) ← previous conversation │
│ ├── Jean Tremblay (father) │
│ └── Sophie Tremblay (daughter) │
│ │
│ CRM has 3 contacts with the same phone number (family). │
│ SMS arrives — IdentityResolver returns "ambiguous" (3 candidates). │
│ A sync hook (on_identity_ambiguous) decides what to do: │
│ → Strategy A: Create room with pending participant, advisor │
│ sees candidates and picks one manually. │
│ → Strategy B: Auto-resolve from context (most recent room). │
│ → Strategy C: Challenge — ask customer to self-identify via SMS. │
│ Framework provides the pipeline. Integrator picks the strategy. │
└─────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│ Use Case 8: AI ↔ AI (multi-agent collaboration) │
│ │
│ [AI: analyst] ←── Room ──→ [AI: writer] │
│ ↑ ↑ │
│ │ └── visibility=all │
│ └── visibility=all │
│ │
│ Two AI agents collaborate in a Room. The analyst reads documents │
│ and produces structured findings. The writer reads the analyst's │
│ findings and generates a client-ready report. │
│ │
│ Each AI responds to the other's events via on_event() — creating │
│ an event chain. The framework tracks chain_depth on every event. │
│ When chain_depth >= max_event_chain_depth (default 10), the chain │
│ is stopped: the next event is blocked (status=BLOCKED, │
│ blocked_by="event_chain_depth_limit"), an observation is emitted, │
│ and side effects still flow. │
│ │
│ Variations: │
│ → Add a human [Advisor: Browser] to review the final output │
│ → Add an [Observer: read] for quality scoring (side effects) │
│ → Mute the writer until the analyst is done (dynamic control) │
│ │
│ Same primitives. No special "multi-agent" API. Rooms, channels, │
│ permissions, and chain_depth handle it. │
└─────────────────────────────────────────────────────────────────────┘
What This Framework Is¶
- A Room-based conversation manager that connects any combination of humans, AI, and programs
- A channel abstraction where SMS, HTTP, AI, and custom channels share the same interface
- A permission system (read/write/mute/visibility) that controls channel behavior
- A hook system for intercepting, blocking, modifying, and enriching events
- A provider abstraction where Twilio/Sinch/SendGrid are interchangeable implementations
What This Framework Is NOT¶
- Not a CPaaS provider (Twilio, Sinch own the transport)
- Not an AI framework (pydantic-ai handles agent logic)
- Not a chat app (it's the plumbing behind any conversation-based app)
- Not opinionated about business logic — the framework provides primitives (access, mute, visibility), the integrator decides when and why to use them
2. Quick Start (5-Minute Example)¶
SMS inbound → Room created → AI responds → SMS outbound. Copy-paste, it works.
from roomkit import RoomKit
from roomkit.channels.sms import SMSChannel
from roomkit.channels.ai import AIChannel
from roomkit.providers.sms.twilio import TwilioSMSProvider
from roomkit.providers.ai.pydantic_ai import PydanticAIProvider
from roomkit.store.memory import InMemoryStore
from roomkit.config.providers import TwilioConfig
from pydantic_ai import Agent
# 1. Providers
sms_provider = TwilioSMSProvider(config=TwilioConfig(
account_sid="AC...", auth_token="secret", from_number="+15559876543",
))
ai_provider = PydanticAIProvider(agent=Agent(
model="anthropic:claude-sonnet-4-5",
instructions="Tu es un assistant financier. Réponds en français, sois concis.",
))
# 2. Channels
sms = SMSChannel(provider=sms_provider)
ai = AIChannel(provider=ai_provider, name="support")
# 3. RoomKit
fw = RoomKit(store=InMemoryStore())
fw.register_channel(sms)
fw.register_channel(ai)
# 4. When a new room is created, attach the AI automatically
@fw.hook(trigger="on_room_created", execution="async")
async def auto_attach_ai(room):
await fw.attach_channel(room.id, "support", access="read_write", visibility="all")
# 5. Process inbound (called by your webhook handler)
event = await fw.process_inbound("sms", {"From": "+15551234567", "Body": "Bonjour"})
# → Room created → AI receives message → AI responds → SMS delivered to customer
That's it. The framework handles room creation, identity resolution, event routing, AI response generation, and SMS delivery. Everything else — hooks, permissions, dynamic channels, identity ambiguity — builds on top of this.
3. Architecture Overview¶
┌─────────────────────────────────────────────────────────────────────┐
│ Integration Surfaces │
│ │
│ ┌───────────────┐ ┌───────────────┐ │
│ │ REST API │ │ MCP Server │ │
│ │ (humans, │ │ (AI agents, │ │
│ │ dashboards, │ │ tools, │ │
│ │ systems) │ │ resources) │ │
│ └───────┬───────┘ └───────┬───────┘ │
└───────────┼─────────────────────────┼────────────────────────────────┘
│ │
▼ ▼
┌─────────────────────────────────────────────────────────────────────┐
│ RoomKit Core │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Room Manager │ │ Event Router │ │ Identity │ │
│ │ │ │ │ │ Resolver │ │
│ └──────────────┘ └──────┬───────┘ └──────────────┘ │
│ │ │
│ ┌──────┴───────┐ │
│ │ Hook Engine │ ← sync/async hooks │
│ │ (pipeline) │ intercept, block, modify, │
│ └──────┬───────┘ enrich events │
│ │ │
│ ┌──────────────────────────────────┐ │
│ │ Conversation Store │ │
│ │ (Rooms, Events, Identities) │ │
│ └──────────────────────────────────┘ │
└────────────────────────────┬────────────────────────────────────────┘
│
Channel Interface │ (same for ALL)
│
┌──────────┬──────────┬┴─────────┬──────────┬──────────┐
▼ ▼ ▼ ▼ ▼ ▼
┌────────┐┌────────┐┌─────────┐┌──────────┐┌────────┐┌─────────┐
│ SMS ││ Email ││ HTTP/ ││ AI ││Observer││ Custom │
│Channel ││Channel ││ WS ││ Channel ││Channel ││ Channel │
│ ││ ││ Channel ││ ││ ││ │
└───┬────┘└───┬────┘└─────────┘└──────────┘└────────┘└─────────┘
│ │
Provider Provider Provider abstraction:
Layer Layer Channel type ≠ Provider
│ │ SMS can be Twilio OR Sinch
▼ ▼ Email can be SendGrid OR SMTP
┌────────┐┌────────┐
│Twilio ││SndGrid │
│Sinch ││SMTP │
│Vonage ││Mailgun │
└────────┘└────────┘
Key separations:
| Layer | Responsibility |
|---|---|
| Integration Surfaces | REST API (for humans/systems) + MCP Server (for AI agents) |
| RoomKit Core | Room lifecycle, event routing, hook pipeline, permissions, store, identity |
| Channel Interface | Unified abstraction — everything is a Channel |
| Provider Layer | Interchangeable implementations (Twilio, Sinch, SendGrid...) |
3. Core Concepts¶
3.1 The Room¶
A conversation space. Channels connect to it. Events flow through it. Hooks intercept them.
Room
├── id: str
├── organization_id: str (multi-tenant isolation)
├── status: RoomStatus (active, paused, closed, archived)
├── channels: list[ChannelBinding] (attached channels — dynamic, with permissions)
├── participants: list[Participant] (humans in the conversation)
├── timeline: list[RoomEvent] (everything that happened)
├── latest_index: int (highest event index — for unread calculations)
├── timers: RoomTimers | None (auto-transitions — inspired by Twilio)
├── metadata: dict[str, Any] (custom data)
├── created_at: datetime
└── updated_at: datetime
Room Timers (inspired by Twilio Conversations):
RoomTimers
├── inactive_after: timedelta | None (auto-transition active → paused after N inactivity)
├── closed_after: timedelta | None (auto-transition paused → closed after N more inactivity)
└── last_activity_at: datetime (timestamp of last message event)
Example: RoomTimers(inactive_after=timedelta(minutes=30), closed_after=timedelta(hours=24))
- After 30 min of no messages → Room moves to paused, emits room_paused event
- After 24h more of no messages → Room moves to closed, emits room_closed event
- Any new message resets the timer and moves back to active
Timers are optional. If not set, the Room stays active until explicitly closed.
Key properties: - Channel-agnostic: The Room doesn't know SMS from AI — it routes events - Multi-channel: SMS + Browser + AI + Observer simultaneously - Dynamic: Channels can be attached/detached/muted/reconfigured at any time - Observable: Hooks and read-access channels see everything - Persistent: Survives across sessions, channel switches, escalations
3.2 Channel (Unified Abstraction)¶
A Channel is anything that interacts with a Room. Human on SMS, human in browser, AI agent, compliance program — all are Channels.
Channel
├── id: str
├── type: ChannelType (sms, email, http, websocket, ai, custom)
├── category: ChannelCategory (transport | intelligence | integration)
├── direction: ChannelDirection (inbound | outbound | bidirectional)
├── media_types: list[ChannelMediaType] (what this channel carries: text, audio, video)
├── capabilities: ChannelCapabilities (what this channel supports)
├── provider: str | None (twilio, sinch, sendgrid — for transport channels)
├── status: ChannelStatus (connected, disconnected, error)
└── info: dict[str, Any] (channel instance information — from_number, model, etc.)
Channel media types:
ChannelMediaType (enum)
├── TEXT (SMS, Email, Chat, WhatsApp text)
├── AUDIO (Phone calls, voice messages, WhatsApp audio)
└── VIDEO (Video calls, WhatsApp video)
Channel media_types
SMS [text]
Email [text] (attachments are media, but channel carries text)
HTTP/WebSocket [text] (could support audio/video via WebRTC later)
WhatsApp [text, audio, video]
Phone (future) [audio]
Video (future) [video, audio, text]
AI [text] (text generation — could produce audio/video in the future)
Media types serve three purposes: 1. Routing: the framework knows which channels can carry which content 2. Capability matching: AI can adapt generation to the media type 3. Future-proofing: audio/video channels plug in without model changes
Channel info:
Each channel instance exposes an info property with its configuration details:
class SMSChannel(Channel):
@property
def info(self) -> dict[str, Any]:
return {"provider": self.provider.name, "from_number": self.provider.from_number}
class AIChannel(Channel):
@property
def info(self) -> dict[str, Any]:
return {"provider": self.provider.name, "model": self.provider.model_name}
This info is surfaced via the REST API and MCP resources, so integrators can query "what channels are in this room and what are they configured with?"
Three channel categories:
| Category | Purpose | Examples |
|---|---|---|
| Transport | Carries messages to/from humans | SMS, Email, HTTP/WebSocket, WhatsApp |
| Intelligence | Processes events, produces responses/insights | AI agent (pydantic-ai), sentiment analyzer |
| Integration | Connects to external systems | CRM updater, calendar, notification service |
Categories are informational — they help the integrator organize channels. The framework treats all channels the same through the unified interface:
class Channel(ABC):
type: ChannelType
category: ChannelCategory
direction: ChannelDirection
media_types: list[ChannelMediaType]
capabilities: ChannelCapabilities
@property
def info(self) -> dict[str, Any]:
"""Channel instance information (provider config, from_number, model, etc.)
Surfaced via REST API and MCP resources."""
return {}
async def handle_inbound(self, raw_payload: Any) -> InboundResult:
"""INBOUND: Called when something arrives from outside (webhook, WS message).
Parses raw payload into a RoomEvent."""
...
async def deliver(self, event: RoomEvent, binding: ChannelBinding) -> DeliveryResult:
"""OUTBOUND: Called when the framework needs to push an event to this channel's outside.
Sends SMS, pushes to WebSocket, etc."""
...
async def on_event(self, event: RoomEvent, room: RoomContext) -> ChannelOutput:
"""READ: Called when a room event occurs and this channel has read access.
Channel reacts internally (AI generates response, observer analyzes)."""
...
3.3 Channel Binding & Permissions¶
When a channel is attached to a Room, it gets a ChannelBinding that defines its permissions in that Room. This is the core primitive system.
ChannelBinding
├── channel_id: str
├── room_id: str
├── access: Access (read | write | read_write)
├── muted: bool (temporary: blocks writing, preserves reading)
├── write_visibility: Visibility (when writing: who sees the events?)
│ ├── all (all channels in the room)
│ ├── channels(["http", "ai_x"]) (only specific channels)
│ └── internal (stored in timeline, not broadcast)
├── participant_id: str | None (which human, if any)
├── last_read_index: int | None (read horizon — last event index this channel has read)
├── attached_at: datetime
└── metadata: dict[str, Any]
Read horizon (inspired by Twilio): last_read_index tracks how far each channel/participant has read. This enables:
- Unread count: room.latest_index - binding.last_read_index
- Read receipts: when a channel advances its read index, emit a read_receipt event
- UI badges: "3 unread messages"
Updated via:
await fw.mark_read(room_id, channel_id, index=42) # mark as read up to event 42
await fw.mark_all_read(room_id, channel_id) # mark everything as read
The three primitives:
| Primitive | What it controls | Framework provides | Integrator decides |
|---|---|---|---|
| Access | Can this channel read events? Write events? | read, write, read_write |
When to set each access level |
| Muted | Temporarily block writing (reading continues) | mute(), unmute() |
When to mute/unmute |
| Visibility | When this channel writes, who receives? | all, channels([...]), internal |
What visibility each channel gets |
Rules:
Reading:
access=read or access=read_write → channel receives events via on_event()
access=write → channel does NOT receive events (write-only, rare use case)
Writing:
access=write or access=read_write → channel can produce Room events
muted=true → writing is temporarily blocked (overrides access)
write_visibility determines which other channels see the events
Side Effects (always allowed):
Tasks, observations, metadata updates → always produced regardless of access/muted
A read-only channel can still flag compliance issues
A muted channel can still observe and produce insights
Side effects are stored separately and exposed via API/hooks
Named patterns: The framework doesn't impose these, but they name the common configurations:
| Pattern | Configuration | Description |
|---|---|---|
| Direct | access=read_write, visibility=all |
Channel speaks to everyone. AI talks directly to the customer. |
| Assistant | access=read_write, visibility=channels(["ws_advisor"]) |
Channel writes, but only the advisor sees it. AI suggests, advisor decides. |
| Observer | access=read |
Read-only. Analyzes events, produces side effects only (tasks, observations). Cannot write messages. |
| Muted | muted=true |
Temporarily silenced. Still reads, still produces side effects. Writing is blocked until unmuted. |
| Internal | access=read_write, visibility=internal |
Writes are stored in timeline but not broadcast. Useful for audit logging. |
These are vocabulary, not framework concepts. The integrator combines primitives however they want — the patterns just name the most frequent combinations.
3.4 Two Output Paths¶
When a channel processes an event, it produces a ChannelOutput with two distinct paths:
ChannelOutput
│
├── Room Events (messages, responses)
│ ├── events: list[RoomEvent]
│ │
│ │ Subject to permissions:
│ │ - Channel must have write access
│ │ - Channel must not be muted
│ │ - Events are broadcast according to write_visibility
│ │ - Events are stored in the Room timeline
│ │
│ └── The Event Router enforces all of this. The channel just produces output.
│
└── Side Effects (always allowed)
├── tasks: list[Task] (actionable items for external systems)
├── observations: list[Observation] (passive insights: sentiment, flags)
└── metadata_updates: dict (enrich Room metadata)
Not subject to access/mute permissions.
Routing suggestions (e.g., escalate to human) are expressed as Tasks.
A read-only or muted channel can still produce side effects.
Why two paths? - A muted AI can't talk to the customer but CAN still flag "this customer is at risk of churning" - A read-only compliance channel can't send messages but CAN create tasks and flag PII - Muting silences the voice, not the brain
3.5 Provider Abstraction¶
A Channel type and its Provider are separate concepts. This applies to all channels — including AI.
Channel Type: SMS Email WhatsApp
│ │ │
Provider: ├─ Twilio ├─ SendGrid ├─ Meta Cloud API
├─ Sinch ├─ SMTP └─ Twilio
├─ Vonage ├─ Mailgun
└─ Custom └─ SES
Channel Type: AI
│
Provider: ├─ PydanticAIProvider (wraps pydantic-ai → Anthropic, OpenAI, Gemini, etc.)
├─ AnthropicProvider (Anthropic SDK directly)
├─ OpenAIProvider (OpenAI SDK directly)
├─ LangChainProvider (LangChain)
└─ Custom (integrator writes their own)
Transport provider example (SMS):
class SMSChannel(Channel):
type = ChannelType.SMS
category = ChannelCategory.TRANSPORT
media_types = [ChannelMediaType.TEXT]
def __init__(self, provider: SMSProvider):
self.provider = provider
class SMSProvider(ABC):
name: str
from_number: str
async def send_sms(self, to: str, body: str) -> ProviderResult: ...
async def parse_webhook(self, payload: dict) -> InboundMessage: ...
class TwilioSMSProvider(SMSProvider):
name = "twilio"
def __init__(self, account_sid: str, auth_token: str, from_number: str): ...
class SinchSMSProvider(SMSProvider):
name = "sinch"
def __init__(self, app_id: str, app_secret: str, from_number: str): ...
Intelligence provider example (AI):
The AI channel follows the exact same pattern. The AIProvider ABC defines a generate() interface. The integrator chooses which lib to use — pydantic-ai, LangChain, raw SDKs, or anything custom.
class AIChannel(Channel):
type = ChannelType.AI
category = ChannelCategory.INTELLIGENCE
media_types = [ChannelMediaType.TEXT]
def __init__(self, provider: AIProvider, name: str = "ai"):
self.provider = provider
self.name = name
class AIProvider(ABC):
"""Abstract AI provider — wraps any AI library or SDK."""
name: str # e.g., "pydantic-ai", "anthropic", "openai"
model_name: str # e.g., "claude-sonnet-4-5", "gpt-4o"
@abstractmethod
async def generate(
self,
messages: list[AIMessage],
context: AIContext,
) -> AIResponse:
"""Generate a response given conversation history and context."""
...
@dataclass
class AIContext:
"""Context provided to the AI provider at generation time."""
room: RoomContext
target_capabilities: ChannelCapabilities | None # target channel constraints
target_media_types: list[ChannelMediaType] # what the target can carry
system_instructions: str | None # channel-level instructions
metadata: dict[str, Any] # extra context from integrator
@dataclass
class AIResponse:
"""Response from AI provider."""
text: str
tasks: list[Task] = field(default_factory=list)
observations: list[Observation] = field(default_factory=list)
provider_metadata: dict[str, Any] = field(default_factory=dict)
# provider_metadata: model, tokens_used, latency_ms, finish_reason, etc.
Framework-provided AI providers:
class PydanticAIProvider(AIProvider):
"""Wraps a pydantic-ai Agent. Supports Anthropic, OpenAI, Gemini, etc."""
name = "pydantic-ai"
def __init__(self, agent: Agent):
self.agent = agent
self.model_name = str(agent.model)
async def generate(self, messages, context) -> AIResponse:
result = await self.agent.run(
user_prompt=messages[-1].text,
deps=self._build_deps(context),
message_history=self._convert_history(messages),
)
return AIResponse(text=result.data, provider_metadata={"model": self.model_name})
class AnthropicDirectProvider(AIProvider):
"""Uses Anthropic SDK directly (no pydantic-ai)."""
name = "anthropic"
def __init__(self, client: AsyncAnthropic, model: str = "claude-sonnet-4-5"):
self.client = client
self.model_name = model
async def generate(self, messages, context) -> AIResponse:
response = await self.client.messages.create(
model=self.model_name,
messages=self._convert_messages(messages),
system=context.system_instructions or "",
)
return AIResponse(
text=response.content[0].text,
provider_metadata={"model": self.model_name, "tokens": response.usage.output_tokens},
)
class MockAIProvider(AIProvider):
"""For testing — returns canned responses."""
name = "mock"
model_name = "mock"
def __init__(self, responses: list[str]): ...
Why this matters: - Same pattern everywhere: SMS has SMSProvider, AI has AIProvider. Consistent abstraction. - Library-agnostic: pydantic-ai, LangChain, raw SDK, or custom — the framework doesn't care. - Swap providers: Change from OpenAI to Anthropic without touching Room logic. - Test with mocks: MockAIProvider for unit tests, no API calls needed. - Multi-tenant: Different organizations can use different AI providers. - The Room and Event Router never see provider details.
3.6 Channel Metadata — Three Levels¶
Channel-specific information is captured at three levels. Each level serves a different purpose:
┌──────────────────────────────────────────────────────────────────────────────┐
│ Level 1: Channel Instance (static — lives on the Channel) │
│ │
│ Channel.info → dict[str, Any] │
│ │
│ SMS: {"provider": "twilio", "from_number": "+15551234", "account": "AC..."} │
│ AI: {"provider": "pydantic-ai", "model": "claude-sonnet-4-5"} │
│ HTTP: {"endpoint": "wss://app.example.com/ws", "protocols": ["websocket"]} │
│ │
│ Purpose: What IS this channel? Surfaced via REST API / MCP resources. │
│ Set by: Channel implementation (from its provider config). │
│ Lifetime: Channel instance lifetime. Doesn't change per room. │
└──────────────────────────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────────────────────┐
│ Level 2: Channel Binding (per-room — lives on the ChannelBinding) │
│ │
│ ChannelBinding.metadata → dict[str, Any] │
│ │
│ AI in Room A: {"role": "support", "language": "fr", "persona": "formal"} │
│ AI in Room B: {"role": "sales", "language": "en", "persona": "casual"} │
│ SMS in Room: {"customer_name": "Jean", "priority": "high"} │
│ │
│ Purpose: Per-room channel configuration. Same AI channel, different behavior │
│ per room. Integrator-defined. │
│ Set by: Integrator when attaching channel to room. │
│ Lifetime: Binding lifetime (attach to detach). Mutable via API. │
└──────────────────────────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────────────────────┐
│ Level 3: Event Source (per-event — lives on the RoomEvent) │
│ │
│ EventSource.channel_data → ChannelData (typed, per-channel) │
│ EventSource.raw_payload → dict (original provider payload — never lost) │
│ │
│ SMS event: SMSChannelData { from_number, to_number, segments, encoding } │
│ AI event: AIChannelData { model, tokens_used, latency_ms, finish_reason } │
│ HTTP event: HTTPChannelData { session_id, user_agent, ip_address } │
│ │
│ Purpose: What happened in THIS specific event? Provider-specific details. │
│ Set by: Channel implementation when processing inbound or producing output. │
│ Lifetime: Immutable. Part of the event record forever. │
└──────────────────────────────────────────────────────────────────────────────┘
Example: querying all three levels for an SMS channel in a room:
# Level 1 — Channel instance info
channel = fw.get_channel("sms_main")
channel.info # {"provider": "twilio", "from_number": "+15551234"}
# Level 2 — Binding metadata (per-room)
binding = await fw.get_binding(room_id, "sms_main")
binding.metadata # {"customer_name": "Jean", "priority": "high"}
# Level 3 — Event source (per-event)
event = room.timeline[-1]
event.source.channel_data # SMSChannelData(from_number="+15559876", to_number="+15551234", segments=1)
event.source.raw_payload # {"MessageSid": "SM...", "From": "+15559876", ...} (Twilio raw)
3.7 Dynamic Channel Management¶
The framework provides operations to modify channels at runtime. The integrator decides when to call them:
# Primitives the framework provides:
await fw.attach_channel(room_id, channel_id, access="read_write", visibility="all")
await fw.detach_channel(room_id, channel_id)
await fw.set_access(room_id, channel_id, access="read")
await fw.mute(room_id, channel_id)
await fw.unmute(room_id, channel_id)
await fw.set_visibility(room_id, channel_id, visibility=Visibility.channels(["http"]))
Each operation emits a RoomEvent (channel_attached, channel_detached, channel_muted, channel_unmuted, channel_updated) so the timeline records what happened.
3.8 Channel Direction (Inbound / Outbound)¶
A channel has inherent directional capabilities — what it can physically do, independent of per-room permissions:
ChannelDirection (enum)
├── INBOUND (can receive from outside → produce events)
├── OUTBOUND (can send to outside → deliver events)
└── BIDIRECTIONAL (both)
Channel Direction Why
SMS BIDIRECTIONAL Receives webhooks (inbound), sends SMS (outbound)
Email BIDIRECTIONAL Receives inbound email, sends outbound email
HTTP/WebSocket BIDIRECTIONAL Receives user input, pushes events to browser
AI BIDIRECTIONAL Receives room events (inbound), produces responses (outbound)
Observer INBOUND Receives events, produces side effects only (no outbound delivery)
Notification OUTBOUND Only sends notifications (Slack, push), never receives
Webhook (custom) INBOUND Receives external webhooks, injects events into room
Direction vs Access:
Direction = what the channel CAN do (physical/technical — set at channel level)
Access = what the channel MAY do (permission — set per room binding)
A BIDIRECTIONAL channel can be given access=read (restricting it to inbound only in a room).
An INBOUND channel given access=write makes no sense — the framework validates this.
The Channel ABC declares its direction:
class Channel(ABC):
type: ChannelType
category: ChannelCategory
direction: ChannelDirection # what this channel can physically do
media_types: list[ChannelMediaType]
capabilities: ChannelCapabilities
# INBOUND: must implement handle_inbound
async def handle_inbound(self, raw_payload: Any) -> InboundResult:
"""Called when something arrives from outside (webhook, user input)."""
...
# OUTBOUND: must implement deliver
async def deliver(self, event: RoomEvent, binding: ChannelBinding) -> DeliveryResult:
"""Called when the framework needs to deliver an event to this channel's outside."""
...
# READ: must implement on_event (for processing/reacting to room events)
async def on_event(self, event: RoomEvent, room: RoomContext) -> ChannelOutput:
"""Called when a room event occurs and this channel has read access."""
...
Three methods, three concerns:
| Method | When called | Direction | Example |
|---|---|---|---|
handle_inbound |
External payload arrives (webhook, WS message) | INBOUND | Twilio webhook → SMS event |
deliver |
Framework needs to push event to channel's outside | OUTBOUND | Send SMS, push to WebSocket |
on_event |
Room event occurs, channel has read access | READ (any direction) | AI processes event, observer analyzes |
on_event vs deliver:
- on_event: The channel reacts to a room event internally (AI generates a response, observer flags something). Produces ChannelOutput.
- deliver: The framework pushes a room event out to the channel's external recipient (send SMS to phone, push to browser via WebSocket). Returns DeliveryResult.
For transport channels, the Event Router calls BOTH:
1. on_event — let the channel react (usually no-op for transport)
2. deliver — push the event to the external recipient
For intelligence channels, only on_event is called (AI doesn't "deliver" to an external system — it produces output back into the room).
3.9 Real-Time Event System¶
The framework manages real-time connections and provides an event subscription system.
Two levels of events:
┌──────────────────────────────────────────────────────────────────────────────┐
│ Level 1: Room Events (per-room, stored in timeline) │
│ │
│ Events within a room's conversation: │
│ - message, typing, read_receipt, delivery_status │
│ - channel_attached, channel_detached, channel_muted, channel_unmuted │
│ - participant_joined, participant_left │
│ - task_created, observation │
│ │
│ Stored in Room timeline. Subject to visibility rules. │
│ Delivered to channels with read access in the room. │
└──────────────────────────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────────────────────┐
│ Level 2: Framework Events (global, for external subscribers) │
│ │
│ Events about the framework state: │
│ - room_created, room_closed, room_archived │
│ - channel_registered, channel_unregistered │
│ - channel_connected, channel_disconnected, channel_error │
│ - room_channel_attached, room_channel_detached │
│ - identity_created, identity_linked │
│ │
│ NOT stored in a room timeline. Published to subscribers. │
│ Used by dashboards, monitoring, external systems. │
└──────────────────────────────────────────────────────────────────────────────┘
Real-time transport for browser/WebSocket channels:
class WebSocketChannel(Channel):
"""Browser-based channel using WebSocket for real-time communication."""
type = ChannelType.WEBSOCKET
category = ChannelCategory.TRANSPORT
direction = ChannelDirection.BIDIRECTIONAL
media_types = [ChannelMediaType.TEXT]
async def handle_inbound(self, raw_payload: Any) -> InboundResult:
"""User sends a message or typing indicator via WebSocket."""
data = json.loads(raw_payload)
if data["type"] == "typing":
return InboundResult(event=RoomEvent(type=EventType.TYPING, ...))
if data["type"] == "message":
return InboundResult(event=RoomEvent(type=EventType.MESSAGE, ...))
async def deliver(self, event: RoomEvent, binding: ChannelBinding) -> DeliveryResult:
"""Push event to browser via WebSocket connection."""
ws = self._get_connection(binding.participant_id)
if ws and ws.connected:
await ws.send_json(self._serialize_event(event))
return DeliveryResult(status="sent")
return DeliveryResult(status="failed", error="disconnected")
Typing indicators — full flow:
Browser User types...
→ WebSocket message: {"type": "typing", "room_id": "room_1"}
→ WebSocketChannel.handle_inbound() → RoomEvent(type=TYPING)
→ Event Router broadcasts to channels with read access:
→ Other WebSocket channels: deliver() → push typing indicator to other browsers
→ AI channel: on_event() → may ignore (or use for "user is typing" awareness)
→ SMS channel: deliver() → no-op (SMS doesn't support typing indicators)
(ChannelCapabilities.supports_typing_indicator = false → Event Router skips delivery)
Framework event subscription:
# Subscribe to framework-level events (for dashboards, monitoring)
@fw.on("room_created")
async def on_room_created(event: FrameworkEvent):
"""Notify dashboard that a new room was created."""
await dashboard_ws.broadcast({"type": "room_created", "room": event.data})
@fw.on("channel_connected")
async def on_channel_connected(event: FrameworkEvent):
"""Monitor channel health."""
await metrics.record("channel.connected", tags={"type": event.data["channel_type"]})
@fw.on("room_channel_attached")
async def on_channel_added_to_room(event: FrameworkEvent):
"""Notify room participants that a new channel joined."""
await dashboard_ws.send_to_room(event.data["room_id"], {
"type": "channel_joined",
"channel": event.data["channel_info"],
})
Connection management:
class RoomKit:
async def connect_websocket(self, ws: WebSocket, room_id: str, participant_id: str):
"""Register a WebSocket connection for a participant in a room."""
...
async def disconnect_websocket(self, ws: WebSocket):
"""Clean up on disconnect. Emits channel_disconnected event."""
...
def get_connected_channels(self, room_id: str) -> list[ChannelBinding]:
"""List currently connected channels for a room (with connection status)."""
...
3.10 Room & Channel Metadata¶
Both Rooms and ChannelBindings carry mutable metadata: dict[str, Any] that can be updated at any time via the API:
# Room metadata — set at creation or updated later
room = await fw.create_room(org_id="org_1", metadata={"source": "inbound_sms", "priority": "high"})
await fw.update_room_metadata(room.id, {"language": "fr", "customer_tier": "premium"})
# Binding metadata — set when attaching or updated later
await fw.attach_channel(room.id, "support", access="read_write",
metadata={"persona": "formal", "language": "fr"})
await fw.update_binding_metadata(room.id, "support", {"persona": "casual"})
Metadata is:
- Arbitrary: any JSON-serializable dict
- Mutable: update via API at any time (emits room_updated / channel_updated event)
- Accessible in hooks: room.metadata, binding.metadata available in all hook handlers
- Accessible to AI: AIContext.metadata passes room metadata to the AI provider
- Queryable: find_rooms(org_id, metadata_filter={"priority": "high"}) for room lookup
4. Hook System¶
Hooks are the framework's middleware. They intercept events in a pipeline.
4.1 Hook Types¶
Hook
├── name: str
├── trigger: HookTrigger (when does this hook fire?)
│ ├── before_broadcast (before event reaches channels — can BLOCK)
│ ├── after_broadcast (after all channels processed — for logging)
│ ├── on_channel_attached
│ ├── on_channel_detached
│ ├── on_channel_muted
│ ├── on_channel_unmuted
│ ├── on_room_created
│ ├── on_room_closed
│ ├── on_task_created
│ ├── on_identity_ambiguous (identity resolution returned multiple candidates)
│ ├── on_identity_unknown (identity resolution returned no matches)
│ ├── on_participant_identified (pending participant was resolved)
│ └── on_error
├── execution: HookExecution (sync or async?)
│ ├── sync (blocking — event waits for hook to complete)
│ └── async (non-blocking — fire and forget)
├── priority: int (execution order — lower = first)
└── handler: Callable (the function to run)
4.2 Sync Hooks (Blocking)¶
Sync hooks run before the event is broadcast. They can:
- Allow the event (return HookResult.allow())
- Block the event (return HookResult.block(...)) — optionally inject replacement events + side effects
- Modify the event (return HookResult.modify(new_event))
HookResult model:
HookResult
├── action: "allow" | "block" | "modify"
├── reason: str | None (why blocked/modified — stored on event)
├── modified_event: RoomEvent | None (only for action=modify)
├── inject: list[InjectedEvent] (targeted events sent INSTEAD of blocked event)
├── tasks: list[Task] (side effects — always processed)
└── observations: list[Observation] (side effects — always processed)
InjectedEvent
├── content: EventContent (what to send)
├── target_channel: str | None (send to ONE specific channel)
├── target_channels: list[str] | None (send to MULTIPLE specific channels)
├── visibility: Visibility = all (fallback if no target specified)
└── source_label: str = "system" (identifies the injected event source)
When a hook blocks an event:
1. The original event is stored in the timeline with status=BLOCKED and blocked_by=hook_name (audit trail)
2. The original event is NOT delivered to any channel
3. Each InjectedEvent is delivered to its targeted channel(s) as a new event with status=DELIVERED
4. Tasks and observations are processed as side effects (compliance logging, alerts, etc.)
Example — SIN sensitivity scanner:
@fw.hook(trigger="before_broadcast", execution="sync", priority=0)
async def sensitivity_scanner(event: RoomEvent, room: RoomContext) -> HookResult:
"""Block messages containing sensitive personal data (SIN)."""
if event.type != EventType.MESSAGE:
return HookResult.allow()
text = event.content.text
sin_pattern = re.compile(r'\b\d{3}[-\s]?\d{3}[-\s]?\d{3}\b')
if sin_pattern.search(text):
return HookResult.block(
reason="SIN detected — blocked for security",
inject=[
# Notify the sender (customer on SMS)
InjectedEvent(
target_channel=event.source_channel,
content=TextContent(
text="Votre message a été bloqué. Les numéros d'assurance "
"sociale ne peuvent pas être envoyés par SMS pour des "
"raisons de sécurité."
),
),
# Notify all OTHER channels (advisor on WebSocket)
InjectedEvent(
target_channels=room.other_channels(event.source_channel),
content=TextContent(
text="Le client a tenté d'envoyer des données sensibles (NAS). "
"Le message a été bloqué pour des raisons de sécurité."
),
),
],
observations=[
Observation(
type="sensitivity_violation",
data={
"pattern": "SIN",
"channel": event.source_channel,
"room_id": room.id,
},
),
],
)
# Credit card: redact instead of blocking
if contains_credit_card(text):
redacted_text = redact_credit_card(text)
return HookResult.modify(event.with_content(TextContent(text=redacted_text)))
return HookResult.allow()
4.3 Async Hooks (Non-Blocking)¶
Async hooks run after the event is broadcast. They don't affect the event flow:
@fw.hook(trigger="after_broadcast", execution="async")
async def log_to_audit_trail(event: RoomEvent, room: RoomContext):
await audit_service.log(event)
@fw.hook(trigger="on_task_created", execution="async")
async def dispatch_task(task: Task, room: RoomContext):
if task.type == "callback":
await schedule_callback(task)
elif task.type == "crm_update":
await update_crm(task)
4.4 Hook Pipeline¶
Inbound Event
│
▼
┌──────────────────────────────────┐
│ Sync Hooks (ordered by priority) │
│ │
│ [0] Sensitivity Scanner │
│ → allow / block+inject │
│ [1] Profanity Filter │
│ → allow / modify (redact) │
│ [2] Rate Limiter → allow/block │
│ [3] Language Detector → modify │
│ (adds metadata.language) │
└──────────────┬───────────────────┘
│
blocked? ──yes──┐
│ │
│ ┌────▼──────────────────────────────────┐
│ │ 1. Store original event with │
│ │ status=BLOCKED, blocked_by=hook │
│ │ (audit trail — never delivered) │
│ │ │
│ │ 2. Deliver InjectedEvents to │
│ │ targeted channels │
│ │ → customer gets security notice │
│ │ → advisor gets notification │
│ │ │
│ │ 3. Process side effects │
│ │ → observations (compliance log) │
│ │ → tasks (if any) │
│ └────────────────────────────────────────┘
│
▼ (allowed, possibly modified)
┌──────────────────────────────────┐
│ Event Router │
│ Checks permissions: │
│ - Source has write access? │
│ - Source is not muted? │
│ Broadcasts to channels with │
│ read access, filtered by │
│ source's write_visibility │
│ Event stored with status=DELIVERED│
└──────────────┬───────────────────┘
│
▼
┌──────────────────────────────────┐
│ Async Hooks (fire and forget) │
│ │
│ [·] Audit Logger │
│ [·] Analytics │
│ [·] Webhook Notifier │
└──────────────────────────────────┘
4.5 Room-Level vs Global Hooks¶
# Global hook — applies to ALL rooms
@fw.hook(trigger="before_broadcast", execution="sync")
async def global_pii_blocker(event, room):
...
# Room-level hook — applies to ONE specific room
await fw.add_room_hook(
room_id="room_123",
trigger="before_broadcast",
execution="sync",
handler=custom_compliance_check,
)
4.6 Hook vs Read-Only Channel — When to Use Which?¶
| Sync Hook | Async Hook | Read-Only Channel | |
|---|---|---|---|
| Can block events | Yes | No | No |
| Can modify events | Yes | No | No |
| Can inject targeted events | Yes (on block) | No | No |
| Can produce tasks | Yes (on block) | Yes | Yes (side effects) |
| Can produce observations | Yes (on block) | Yes | Yes (side effects) |
| Can produce responses | No | No | No (read-only) |
| Needs AI/LLM | Usually no (rule-based) | Maybe | Usually yes |
| Runs | Before broadcast | After broadcast | During broadcast |
Rule of thumb:
- Need to block and replace with targeted responses? → Sync hook with HookResult.block(inject=[...])
- Need to block or modify events (simple)? → Sync hook (fast, rule-based)
- Need to react to events with side effects? → Async hook
- Need AI-powered analysis producing insights? → Channel with access=read (produces side effects only)
- Need AI responses to the conversation? → Channel with access=read_write
5. Event Model¶
5.1 Room Events¶
Everything in a Room is a RoomEvent:
RoomEvent
├── id: str
├── room_id: str
├── index: int (sequential per room: 0, 1, 2, ... — for pagination & read tracking)
├── timestamp: datetime
├── type: EventType
│ ├── message (text, media, rich content)
│ ├── participant_joined
│ ├── participant_left
│ ├── channel_attached
│ ├── channel_detached
│ ├── channel_muted
│ ├── channel_unmuted
│ ├── channel_updated (access or visibility changed)
│ ├── task_created
│ ├── observation
│ ├── participant_identified (pending participant was resolved)
│ ├── typing
│ ├── read_receipt
│ ├── delivery_status (sent, delivered, failed)
│ ├── system (errors, notifications)
│ └── custom (extensible — e.g., "custom:your_namespace")
├── status: EventStatus (delivered | blocked | failed)
├── blocked_by: str | None (hook name that blocked it, if status=blocked)
├── source_channel: str (which channel produced this)
├── participant_id: str | None (which human, if applicable)
├── content: EventContent (normalized payload)
├── source: EventSource (original channel-specific data)
├── visibility: Visibility (inherited from source channel's write_visibility)
├── chain_depth: int (0 = human/inbound origin; incremented on channel re-entry)
├── correlation_id: str | None (integrator's external reference — echoed in delivery reports)
└── metadata: dict[str, Any]
Sequential indexing (inspired by Twilio): Every event in a room gets a monotonically increasing index. This enables:
- Efficient pagination: "get events 50-100"
- Read horizon: "participant last read index 42" → events 43+ are unread
- Gap detection: missing index = lost event
- Ordering guarantees without relying on timestamp precision
5.2 Event Content (Normalized Layer)¶
Channel-agnostic content, organized by media type:
EventContent (discriminated union)
Text media:
├── TextContent { text }
├── RichContent { text, buttons, cards, quick_replies }
Visual/file media:
├── MediaContent { url, mime_type, caption, size_bytes }
├── LocationContent { latitude, longitude, label }
Audio media (Phase 4+):
├── AudioContent { url, duration_seconds, mime_type, size_bytes, transcript }
Video media (Phase 4+):
├── VideoContent { url, duration_seconds, mime_type, size_bytes, thumbnail_url, transcript }
Composite:
├── CompositeContent { parts: list[EventContent] }
│ (e.g., a WhatsApp message with text + image + audio)
System:
└── SystemContent { code, message, data }
Each content type maps to a ChannelMediaType:
- TextContent, RichContent → requires channel with TEXT media type
- AudioContent → requires channel with AUDIO media type
- VideoContent → requires channel with VIDEO media type
- MediaContent → depends on mime_type (image → TEXT channel can handle via attachments)
- CompositeContent → requires channel supporting all parts' media types
The Event Router checks media type compatibility: if a channel doesn't support the content's media type, the event is not delivered to that channel (or a fallback is triggered, e.g., audio → transcript as text).
5.3 Event Source (Channel-Specific Layer)¶
Preserves original channel data:
EventSource
├── channel_type: ChannelType
├── provider: str (twilio, sinch, anthropic, etc.)
├── raw_payload: dict[str, Any] (original provider payload — never lost)
├── provider_message_id: str | None
└── channel_data: ChannelData (typed, per-channel structured data)
├── SMSChannelData { from_number, to_number, segments, encoding }
├── EmailChannelData { from, to, cc, subject, thread_id, html_body, attachments }
├── WhatsAppChannelData { wa_id, template, buttons, context }
├── HTTPChannelData { session_id, user_agent, ip_address }
├── AIChannelData { model, agent_name, tokens_used, latency_ms }
└── CustomChannelData { data: dict }
Two layers, one principle: normalize for routing, preserve for specifics.
This is Level 3 of the three-level channel metadata system (see §3.6). Level 1 (channel instance info) and Level 2 (binding metadata) provide static/per-room context. Level 3 captures what happened in each specific event.
6. Channel Capability System¶
6.1 Capabilities¶
Each channel declares what it supports. Capabilities are informed by the channel's media_types:
ChannelCapabilities
Text capabilities (channels with TEXT media type):
├── max_text_length: int | None (SMS: 160, WhatsApp: 4096, Email: None)
├── supports_rich_text: bool (Email: yes, SMS: no)
├── supports_buttons: bool
├── max_buttons: int | None
├── supports_cards: bool
├── supports_quick_replies: bool
├── supports_templates: bool
Media capabilities:
├── supports_media: bool
├── supported_media_types: list[str] (["image/jpeg", "image/png", "audio/ogg", ...])
├── max_media_size_bytes: int | None
Audio capabilities (channels with AUDIO media type, Phase 4+):
├── supports_audio: bool
├── max_audio_duration_seconds: int | None
├── supported_audio_formats: list[str] (["ogg", "mp3", "wav"])
Video capabilities (channels with VIDEO media type, Phase 4+):
├── supports_video: bool
├── max_video_duration_seconds: int | None
├── supported_video_formats: list[str] (["mp4", "webm"])
Delivery:
├── supports_threading: bool
├── supports_typing_indicator: bool
├── supports_read_receipts: bool
├── delivery_mode: DeliveryMode (instant, async, batch)
├── rate_limit: RateLimit | None
Extensible:
└── custom: dict[str, Any]
6.2 Capability-Aware AI¶
When an AI channel responds, the framework provides the target transport channel's capabilities and media types via the AIContext. The AI generates content appropriate for the target at generation time, not via post-processing:
SMS target (text only, 160 chars):
→ AIContext.target_capabilities = {max_text_length: 160, supports_rich_text: false}
→ AIContext.target_media_types = [TEXT]
→ AI prompt includes: "Keep under 160 chars. Plain text. Be direct."
Email target (text, rich):
→ AIContext.target_capabilities = {max_text_length: None, supports_rich_text: true}
→ AIContext.target_media_types = [TEXT]
→ AI prompt includes: "Use formatting. Include details. Professional tone."
WhatsApp target (text + audio + video):
→ AIContext.target_capabilities = {max_text_length: 4096, supports_buttons: true}
→ AIContext.target_media_types = [TEXT, AUDIO, VIDEO]
→ AI prompt includes: "Rich text supported. Buttons available. Can send voice notes."
Browser target (text, rich UI):
→ AIContext.target_capabilities = {supports_buttons: true, supports_cards: true}
→ AIContext.target_media_types = [TEXT]
→ AI prompt includes: "Rich text supported. Buttons and cards available."
The transport channel enforces hard limits as a safety net only. The AI provider receives the full context and decides how to generate.
6.3 Content Transcoding (Non-AI Cross-Channel)¶
When an AI channel produces content, it generates capability-aware content at prompt time (§6.2). But what about human-to-human cross-channel? A user on WhatsApp sends a rich card with buttons → the SMS user can't display it.
The framework provides a pluggable ContentTranscoder:
class ContentTranscoder(ABC):
"""Converts content for target channels that don't support the original format."""
async def transcode(
self,
content: EventContent,
source_capabilities: ChannelCapabilities,
target_capabilities: ChannelCapabilities,
) -> EventContent:
"""Convert content to be compatible with the target channel."""
...
Default transcoding rules (built-in):
RichContent (buttons + cards) → TextContent
"Choose one: 1. Option A 2. Option B 3. Option C"
CarouselContent → Sequential TextContent
"Product 1: Name - Description | Product 2: Name - Description"
MediaContent → TextContent (with URL)
"[Image] https://example.com/image.jpg"
AudioContent → TextContent (with transcript or link)
"[Voice message] [transcript] or [link]"
LocationContent → TextContent
"[Location] 45.5017 N, 73.5673 W - Office HQ"
The integrator can override with a custom ContentTranscoder for domain-specific rules.
Where transcoding happens in the pipeline:
The Event Router calls ContentTranscoder.transcode() before calling channel.deliver() for each target channel. This means:
1. Event Router receives an event with its original content.
2. For each target channel, the router checks media type compatibility.
3. If the content is incompatible with the target, the router transcodes it.
4. The transcoded content is passed to channel.deliver().
5. If transcoding fails (content cannot be represented), the event is not delivered to that channel and a delivery_status(failed) event is emitted.
AI channels bypass transcoding entirely — they receive the original content and generate capability-aware responses at prompt time (see SS6.2).
6.4 Channel Fallback¶
When delivery to a channel fails, the framework can automatically try another channel (inspired by Sinch and MessageBird):
FallbackPolicy
├── priority: list[str] (ordered channel IDs to try: ["whatsapp", "sms", "email"])
├── timeout: timedelta (wait before trying next channel: e.g., 15 minutes)
├── on_failure: FallbackBehavior (retry_next | give_up | hook)
└── max_retries: int (max channels to try before giving up)
# Set fallback policy on a room (applies to all delivery)
await fw.set_fallback_policy(room_id, FallbackPolicy(
priority=["whatsapp", "sms", "email"],
timeout=timedelta(minutes=15),
on_failure="retry_next",
))
# Or per binding (channel-specific fallback)
await fw.attach_channel(room_id, "whatsapp",
access="read_write",
fallback=FallbackConfig(fallback_channel_id="sms", after=timedelta(minutes=15)),
)
The Event Router handles fallback: if deliver() returns DeliveryResult(status="failed"), check fallback policy and retry on next channel.
6.5 Template Support¶
WhatsApp (and some other channels) requires pre-approved templates for outbound messages outside the 24-hour session window. This is a hard requirement for WhatsApp support.
TemplateContent (EventContent variant)
├── template_id: str (the template identifier)
├── language: str (e.g., "fr", "en")
├── parameters: dict[str, Any] (template variable values)
├── channel_templates: dict[ChannelType, ChannelTemplate] (per-channel template config)
└── fallback: EventContent | None (what to send if template not supported)
# Send a WhatsApp template message
event = RoomEvent(
content=TemplateContent(
template_id="order_confirmation",
language="fr",
parameters={"order_id": "1234", "total": "99.99"},
fallback=TextContent(text="Votre commande #1234 a été confirmée. Total: 99.99$"),
),
)
Channels that support templates (WhatsApp, Viber) use the template. Others fall back to the fallback content.
7. Participant & Identity¶
7.1 Participant¶
A Participant is a human (or system identity) in the Room:
Participant
├── id: str
├── role: ParticipantRole (end_user, agent, system)
├── identity: Identity | None (None if pending identification)
├── identification: IdentificationStatus (identified | pending)
├── candidates: list[Identity] | None (set when pending — the possible identities)
├── connected_via: list[str] (channel IDs this participant uses)
├── status: ParticipantStatus (active, idle, left)
├── display_name: str | None (resolved name, or fallback like "Non identifié (+1555...)")
├── resolved_at: datetime | None (when identification was confirmed)
├── resolved_by: str | None (who resolved: "auto", "advisor", "customer", hook name)
└── joined_at: datetime
Identification status:
| Status | Meaning |
|---|---|
identified |
Identity resolved — we know who this participant is |
pending |
Identity ambiguous or unknown — waiting for resolution |
When a participant is pending:
- Messages are still delivered and stored (the conversation isn't blocked)
- candidates lists the possible identities (e.g., 3 family members)
- The advisor can resolve via API or the system can auto-resolve via hook
- Once resolved, identification=identified, identity is set, resolved_at/resolved_by are filled
- A participant_identified event is emitted in the timeline
Important distinction: - Participants = who is in the conversation (humans) - Channels = how they connect and what processes events - A Participant connects via one or more transport channels - AI and integration channels are NOT participants — they're infrastructure - The Room's participant list answers "who is in this conversation?"
7.2 Identity¶
Cross-channel identity resolution:
Identity
├── id: str
├── organization_id: str
├── display_name: str | None
├── channel_addresses: dict[ChannelType, list[str]]
│ (e.g., {sms: ["+15551234"], email: ["john@example.com"]})
├── external_id: str | None (CRM contact ID)
└── metadata: dict[str, Any]
Pluggable resolution via IdentityResolver ABC:
- Address-based (match by phone/email)
- CRM-based (look up in external system)
- Explicit (integrator provides identity)
- Manual (human agent links identities)
7.3 Identity Resolution Pipeline¶
When an inbound event arrives (e.g., SMS webhook), the framework runs the identity resolution pipeline:
Inbound arrives (channel_type=SMS, address="+15551234567")
│
▼
┌─────────────────────────────────────────────────────────────┐
│ IdentityResolver.resolve(channel_type, address, channel_data)│
│ │
│ Returns: IdentityResult │
│ status: "resolved" | "ambiguous" | "unknown" │
│ participant: Participant | None (if resolved) │
│ candidates: list[Identity] (if ambiguous) │
│ address: str (raw channel address) │
│ channel_type: ChannelType │
└─────────────┬───────────────────────────────────────────────┘
│
┌────────┼────────────────────┐
▼ ▼ ▼
RESOLVED AMBIGUOUS UNKNOWN
(1 match) (N matches) (0 matches)
│ │ │
│ ▼ ▼
│ on_identity_ambiguous on_identity_unknown
│ sync hook sync hook
│ │ │
│ Returns one of: Returns one of:
│ • resolved(identity) • create(new_identity)
│ • pending(candidates) • pending()
│ • challenge(inject) • challenge(inject)
│ • reject() • reject()
│ │ │
▼ ▼ ▼
Proceed with identified Proceed with pending
or pending participant or rejected
IdentityResolver ABC:
class IdentityResult:
"""Result of identity resolution — can be resolved, ambiguous, or unknown."""
status: Literal["resolved", "ambiguous", "unknown"]
participant: Participant | None # set if status=resolved
candidates: list[Identity] # set if status=ambiguous (2+)
address: str # raw channel address (+15551234567)
channel_type: ChannelType # SMS, email, whatsapp, etc.
class IdentityResolver(ABC):
"""Pluggable identity resolution — integrator implements this."""
@abstractmethod
async def resolve(
self,
channel_type: ChannelType,
address: str,
channel_data: dict[str, Any],
) -> IdentityResult:
"""Resolve a channel address to a participant.
Examples:
- CRM lookup: SELECT * FROM contacts WHERE phone = address
- Address book lookup: match by phone or email
- External API: call CRM service
"""
...
Identity hook result:
class IdentityHookResult:
"""What the integrator's hook returns when identity is ambiguous or unknown."""
@staticmethod
def resolved(identity: Identity) -> IdentityHookResult:
"""Resolved — we know who this is. Proceed with identified participant."""
...
@staticmethod
def pending(
display_name: str | None = None,
candidates: list[Identity] | None = None,
) -> IdentityHookResult:
"""Pending — create participant with status=pending. Advisor resolves later."""
...
@staticmethod
def challenge(inject: InjectedEvent) -> IdentityHookResult:
"""Challenge — hold the message, ask the sender to self-identify."""
...
@staticmethod
def reject(reason: str = "Unknown sender") -> IdentityHookResult:
"""Reject — do not create room or participant."""
...
Example — CRM resolver with ambiguity handling:
# 1. Integrator implements the resolver (CRM lookup)
class CRMIdentityResolver(IdentityResolver):
def __init__(self, crm_client: CRMClient):
self.crm = crm_client
async def resolve(self, channel_type, address, channel_data) -> IdentityResult:
contacts = await self.crm.find_contacts_by_phone(address)
if len(contacts) == 1:
return IdentityResult(
status="resolved",
participant=self._to_participant(contacts[0]),
address=address,
channel_type=channel_type,
)
elif len(contacts) > 1:
return IdentityResult(
status="ambiguous",
candidates=[self._to_identity(c) for c in contacts],
address=address,
channel_type=channel_type,
)
else:
return IdentityResult(
status="unknown",
address=address,
channel_type=channel_type,
)
# 2. Integrator registers the resolver
fw = RoomKit(
store=PostgresStore(url="..."),
identity_resolver=CRMIdentityResolver(crm_client),
)
# 3. Integrator handles ambiguity (one hook, any strategy)
@fw.hook(trigger="on_identity_ambiguous", execution="sync")
async def handle_ambiguous(
event: InboundResult,
resolution: IdentityResult,
) -> IdentityHookResult:
# Strategy: check for recent active room with any candidate
for candidate in resolution.candidates:
recent = await fw.store.find_latest_room(
participant_id=candidate.id,
status="active",
)
if recent:
return IdentityHookResult.resolved(identity=candidate)
# No recent context — let advisor decide
return IdentityHookResult.pending(
display_name=f"Non identifié ({resolution.address})",
candidates=resolution.candidates,
)
8. Integration Surfaces¶
8.1 REST API¶
For humans, dashboards, and external systems:
Rooms
POST /rooms Create room (with metadata)
GET /rooms/{id} Get room (includes channel info, metadata)
PATCH /rooms/{id} Update room (status, metadata)
DELETE /rooms/{id} Close room
GET /rooms?org_id=X&status=active List rooms (filterable by metadata)
Channels (dynamic, with permissions + metadata)
POST /rooms/{id}/channels Attach channel (access, visibility, metadata)
DELETE /rooms/{id}/channels/{cid} Detach channel
PATCH /rooms/{id}/channels/{cid} Update access, visibility, muted, metadata
POST /rooms/{id}/channels/{cid}/mute Mute channel
POST /rooms/{id}/channels/{cid}/unmute Unmute channel
GET /rooms/{id}/channels List channels (permissions + info + direction + media_types)
Registered Channels (framework-level)
GET /channels List registered channels (type, direction, media_types, info)
GET /channels/{id} Get channel details (capabilities, info, status)
Events & Timeline
POST /rooms/{id}/events Send event into room
GET /rooms/{id}/timeline Get timeline (paginated)
GET /rooms/{id}/timeline?visibility=all Filter by visibility
Participants
POST /rooms/{id}/participants Add participant
DELETE /rooms/{id}/participants/{pid} Remove participant
GET /rooms/{id}/participants List participants (includes identification status)
POST /rooms/{id}/participants/{pid}/resolve Resolve pending participant
body: { "identity_id": "..." } (advisor picks from candidates)
GET /rooms/{id}/participants?identification=pending List unresolved participants
Tasks & Observations
GET /rooms/{id}/tasks Room tasks
GET /tasks?org_id=X&status=pending All pending tasks
PATCH /tasks/{id} Update task
GET /rooms/{id}/observations Room observations
Hooks
POST /rooms/{id}/hooks Add room-level hook
DELETE /rooms/{id}/hooks/{hid} Remove hook
GET /rooms/{id}/hooks List hooks
Identity
POST /identities Create
GET /identities/resolve Resolve by channel+address
PATCH /identities/{id} Link addresses
Webhooks (inbound from providers)
POST /webhooks/{channel_type}/{provider} Inbound message
POST /webhooks/{channel_type}/{provider}/status Delivery status
Real-time (WebSocket)
WS /ws/{room_id} WebSocket connection for a room participant
GET /rooms/{id}/connections List active connections in room
8.2 MCP Server¶
For AI agents to interact with Rooms natively:
MCP Tools (actions):
send_message(room_id, text) Send a message into a Room
create_task(room_id, type, action, data) Create a task
add_observation(room_id, type, data) Log an observation
attach_channel(room_id, channel_config) Add a channel
detach_channel(room_id, channel_id) Remove a channel
mute_channel(room_id, channel_id) Mute a channel
unmute_channel(room_id, channel_id) Unmute a channel
set_channel_visibility(room_id, cid, vis) Change visibility
update_room_metadata(room_id, key, value) Enrich Room data
resolve_identity(channel, address) Look up identity
escalate_to_human(room_id, reason) Request human handoff
MCP Resources (data):
room://{room_id} Room details
room://{room_id}/timeline Conversation history
room://{room_id}/participants Who is in the room
room://{room_id}/channels Attached channels + permissions
room://{room_id}/tasks Tasks for this room
identity://{identity_id} Identity details
8.3 Both Surfaces, Same Core¶
# REST and MCP both call the same RoomKit core:
# REST:
@router.post("/rooms/{room_id}/channels/{cid}/mute")
async def mute_channel(room_id: str, cid: str):
return await fw.mute(room_id, cid)
# MCP:
@mcp.tool()
async def mute_channel(room_id: str, channel_id: str):
return await fw.mute(room_id, channel_id)
# Same fw.mute() underneath — same hooks, same timeline event, same store.
8.4 Inbound Room Routing¶
When an inbound event arrives (e.g., SMS webhook), the framework must decide which Room to route it to. This is process_inbound's responsibility:
Inbound arrives (channel_id, raw_payload)
|
v
Channel.handle_inbound(raw_payload) -> InboundResult
|
v
IdentityResolver.resolve(channel_type, address) -> IdentityResult
|
v
Room Routing Strategy (pluggable):
|
+-- Default: find latest active Room where this participant
| is connected via the same channel type.
| If found -> route to existing Room.
| If not found -> create new Room, fire on_room_created hook.
|
+-- Custom: integrator provides a RoomRouter implementation:
async def route(participant, channel_type, channel_data) -> Room | None
Returns existing Room or None (framework creates new Room).
RoomRouter ABC:
class RoomRouter(ABC):
"""Pluggable room routing -- integrator decides which room an inbound goes to."""
@abstractmethod
async def route(
self,
participant: Participant,
channel_type: ChannelType,
channel_data: dict[str, Any],
) -> Room | None:
"""Return an existing Room to route to, or None to create a new Room.
Examples:
- Find latest active room for this participant (default behavior)
- Match by external ticket ID in channel_data
- Always create a new room (stateless mode)
"""
...
The default implementation (DefaultRoomRouter) finds the latest active Room where the participant is connected via the same channel type. Integrators can override this for ticket-based routing, round-robin assignment, etc.
on_room_created hook behavior:
The on_room_created hook fires after the Room is persisted but before the inbound event is processed. This means:
- The Room exists in the store and has an ID.
- The hook can safely call fw.attach_channel() — no re-entrancy issue because the room lock is not held during hook execution.
- The inbound event is queued and processed after the hook completes.
- Use execution="async" for on_room_created hooks to avoid blocking the inbound pipeline.
9. Event Flow — Complete Scenario¶
Customer (SMS) + AI → Advisor joins → AI becomes assistant → AI unmuted¶
STEP 1: Room created with SMS + AI
─────────────────────────────────
Customer sends SMS: "Bonjour, j'ai besoin d'aide avec mon compte"
→ Twilio webhook → POST /webhooks/sms/twilio
→ SMS Channel parses, resolves identity
→ Framework: no open Room → create_room()
→ on_room_created hook (integrator logic):
attach AI channel: access=read_write, visibility=all, muted=false
→ RoomEvent(type=message) added to timeline
Room channels:
┌──────────┬────────────┬─────────┬────────────┐
│ Channel │ Access │ Muted │ Visibility │
├──────────┼────────────┼─────────┼────────────┤
│ SMS │ read_write │ false │ all │
│ AI │ read_write │ false │ all │
└──────────┴────────────┴─────────┴────────────┘
→ Event Router broadcasts to AI (read access ✓)
→ AI responds: "Bonjour! Comment puis-je vous aider?"
→ AI has write access ✓, not muted ✓, visibility=all
→ SMS Channel delivers to customer ✓
STEP 2: Advisor joins — business logic mutes AI
────────────────────────────────────────────────
Advisor opens dashboard → WebSocket to room
→ on_channel_attached hook (integrator logic):
if new_channel is agent transport:
fw.mute(room_id, ai_channel_id) # AI can't talk
fw.set_visibility(room_id, ai_channel_id, channels(["http"])) # when unmuted, only agents see
Room channels:
┌──────────┬────────────┬─────────┬────────────────┐
│ Channel │ Access │ Muted │ Visibility │
├──────────┼────────────┼─────────┼────────────────┤
│ SMS │ read_write │ false │ all │
│ AI │ read_write │ true │ channels(http) │ ← muted + visibility changed
│ HTTP │ read_write │ false │ all │ ← new
└──────────┴────────────┴─────────┴────────────────┘
Customer SMS: "Je ne peux plus me connecter"
→ Event Router:
→ SMS: source, skip
→ AI: has read access ✓ → receives event → produces ChannelOutput:
events: ["Il a eu 3 tentatives échouées. Suggérer reset mot de passe."]
BUT AI is muted → events are BLOCKED by Event Router
tasks: [Task(type="insight", data="3 failed logins")]
observations: [Observation(type="intent", value="login_issue")]
→ Side effects ARE stored (not subject to mute)
→ HTTP: pushes customer message to advisor's browser ✓
Advisor sees:
- Customer: "Je ne peux plus me connecter"
- [Task from AI: 3 failed login attempts detected] (via REST API / dashboard)
STEP 3: Advisor unmutes AI as assistant
───────────────────────────────────────
Advisor clicks "Enable AI suggestions" in dashboard
→ API: POST /rooms/{id}/channels/{ai_id}/unmute
→ AI is no longer muted, but visibility = channels(["http"])
Room channels:
┌──────────┬────────────┬─────────┬────────────────┐
│ Channel │ Access │ Muted │ Visibility │
├──────────┼────────────┼─────────┼────────────────┤
│ SMS │ read_write │ false │ all │
│ AI │ read_write │ false │ channels(http) │ ← unmuted, but only agents see
│ HTTP │ read_write │ false │ all │
└──────────┴────────────┴─────────┴────────────────┘
Customer SMS: "Est-ce que je peux changer mon mot de passe?"
→ AI responds: "Le client peut reset via Paramètres > Sécurité > Réinitialiser"
→ AI write access ✓, not muted ✓, visibility=channels(["http"])
→ HTTP Channel delivers AI suggestion to advisor ✓
→ SMS Channel does NOT receive it (not in visibility list) ✓
Advisor sees: [AI suggestion] "Le client peut reset via Paramètres > Sécurité..."
Customer sees: nothing from AI
STEP 4: Advisor responds manually
──────────────────────────────────
Advisor types: "Allez dans Paramètres, puis Sécurité, puis Réinitialiser le mot de passe."
→ HTTP source, visibility=all
→ SMS delivers to customer ✓
→ AI receives (read access), produces observation: intent=password_reset ✓
STEP 5: Advisor lets AI respond directly
────────────────────────────────────────
Advisor clicks "Let AI respond to customer" in dashboard
→ API: PATCH /rooms/{id}/channels/{ai_id} body: {visibility: "all"}
Room channels:
┌──────────┬────────────┬─────────┬────────────┐
│ Channel │ Access │ Muted │ Visibility │
├──────────┼────────────┼─────────┼────────────┤
│ SMS │ read_write │ false │ all │
│ AI │ read_write │ false │ all │ ← now customer sees AI again
│ HTTP │ read_write │ false │ all │
└──────────┴────────────┴─────────┴────────────┘
AI can now respond directly to customer via SMS.
STEP 6: Advisor leaves
──────────────────────
→ on_channel_detached hook: no special logic (AI already has visibility=all)
→ Room continues with SMS + AI, as in Step 1.
Timeline:
┌────┬──────────┬─────────────────────────────────────────────┬───────────┬──────────┐
│ # │ Source │ Content │ Type │ Visible │
├────┼──────────┼─────────────────────────────────────────────┼───────────┼──────────┤
│ 1 │ system │ Room created │ system │ internal │
│ 2 │ system │ AI channel attached (rw, all) │ ch_attach │ internal │
│ 3 │ SMS │ "Bonjour, j'ai besoin d'aide" │ message │ all │
│ 4 │ AI │ "Bonjour! Comment puis-je vous aider?" │ message │ all │
│ 5 │ system │ HTTP channel attached (rw, all) │ ch_attach │ internal │
│ 6 │ system │ AI channel muted │ ch_muted │ internal │
│ 7 │ system │ AI visibility → channels(http) │ ch_update │ internal │
│ 8 │ SMS │ "Je ne peux plus me connecter" │ message │ all │
│ 9 │ AI │ Task: 3 failed logins detected │ task │ internal │
│ 10 │ system │ AI channel unmuted │ ch_unmute │ internal │
│ 11 │ SMS │ "Est-ce que je peux changer mon mdp?" │ message │ all │
│ 12 │ AI │ "Reset via Paramètres > Sécurité" │ message │ http │
│ 13 │ HTTP │ "Allez dans Paramètres, puis Sécurité..." │ message │ all │
│ 14 │ system │ AI visibility → all │ ch_update │ internal │
│ 15 │ system │ HTTP channel detached │ ch_detach │ internal │
└────┴──────────┴─────────────────────────────────────────────┴───────────┴──────────┘
Sensitivity Scanning — Block SIN + Targeted Responses (Use Case 6)¶
A customer on SMS sends their Social Insurance Number to a financial advisor on WebSocket. A sync hook intercepts, blocks the message, and injects targeted responses to both parties.
SETUP: Room with SMS (customer) + WebSocket (advisor)
──────────────────────────────────────────────────────
Room: "advisor-session-42"
┌──────────┬────────────┬─────────┬────────────┐
│ Channel │ Access │ Muted │ Visibility │
├──────────┼────────────┼─────────┼────────────┤
│ SMS │ read_write │ false │ all │ ← customer
│ WS │ read_write │ false │ all │ ← financial advisor
└──────────┴────────────┴─────────┴────────────┘
Global sync hook registered: sensitivity_scanner (priority=0)
STEP 1: Customer sends SIN via SMS
───────────────────────────────────
Customer SMS: "Bonjour, mon NAS est 123-456-789"
→ Twilio webhook → POST /webhooks/sms/twilio
→ SMSChannel.handle_inbound() → InboundResult(event)
→ Event enters pipeline, gets index assigned (index=5)
→ Sync Hook Pipeline:
→ [0] sensitivity_scanner:
- Inspects event.content.text
- Regex matches SIN pattern: \b\d{3}[-\s]?\d{3}[-\s]?\d{3}\b ✓
- Returns HookResult.block(...)
STEP 2: Framework processes the block
──────────────────────────────────────
1. STORE original event with status=BLOCKED:
RoomEvent(
index=5,
type=message,
content="Bonjour, mon NAS est 123-456-789",
source_channel="sms",
status=BLOCKED, ← marked as blocked
blocked_by="sensitivity_scanner", ← which hook blocked it
)
→ Stored in timeline for audit trail
→ NOT delivered to any channel
2. DELIVER injected events:
InjectedEvent #1 → target_channel="sms" (customer):
RoomEvent(
index=6,
type=message,
content="Votre message a été bloqué. Les numéros d'assurance
sociale ne peuvent pas être envoyés par SMS pour des
raisons de sécurité.",
source_channel="system",
status=DELIVERED,
)
→ SMSChannel.deliver() → Twilio sends SMS to customer ✓
InjectedEvent #2 → target_channels=[all except "sms"] (advisor):
RoomEvent(
index=7,
type=message,
content="Le client a tenté d'envoyer des données sensibles (NAS).
Le message a été bloqué pour des raisons de sécurité.",
source_channel="system",
status=DELIVERED,
)
→ WebSocketChannel.deliver() → pushes to advisor's browser ✓
3. PROCESS side effects:
Observation(
type="sensitivity_violation",
data={"pattern": "SIN", "channel": "sms", "room_id": "advisor-session-42"}
)
→ Stored + dispatched to on_observation hooks (compliance logging)
STEP 3: What each party sees
─────────────────────────────
Customer (SMS):
✗ Their original message was NOT echoed back (standard SMS behavior)
✓ Receives: "Votre message a été bloqué. Les numéros d'assurance sociale
ne peuvent pas être envoyés par SMS..."
Advisor (WebSocket):
✗ Does NOT see "Bonjour, mon NAS est 123-456-789" (blocked)
✓ Sees: "Le client a tenté d'envoyer des données sensibles (NAS)..."
Compliance team (via API / async hooks):
✓ Can query blocked events: GET /api/v1/rooms/{id}/events?status=blocked
✓ Observation logged: sensitivity_violation with full context
Timeline:
┌────┬──────────┬──────────────────────────────────────────────┬──────────┬──────────┬─────────┐
│ # │ Source │ Content │ Type │ Status │ Visible │
├────┼──────────┼──────────────────────────────────────────────┼──────────┼──────────┼─────────┤
│ 1 │ system │ Room created │ system │ delivered│ internal│
│ 2 │ system │ SMS channel attached (rw, all) │ ch_attach│ delivered│ internal│
│ 3 │ system │ WS channel attached (rw, all) │ ch_attach│ delivered│ internal│
│ 4 │ SMS │ "Bonjour, j'ai une question..." │ message │ delivered│ all │
│ 5 │ SMS │ "Bonjour, mon NAS est 123-456-789" │ message │ BLOCKED │ — │
│ 6 │ system │ "Votre message a été bloqué..." │ message │ delivered│ sms │
│ 7 │ system │ "Le client a tenté d'envoyer..." │ message │ delivered│ ws │
│ │ │ Observation: sensitivity_violation │ observ. │ — │ internal│
└────┴──────────┴──────────────────────────────────────────────┴──────────┴──────────┴─────────┘
Key framework behaviors demonstrated:
- Sync hook blocks before broadcast — the advisor never sees the SIN
- InjectedEvent with target_channel — different messages to different channels
- Blocked event stored with audit trail — status=BLOCKED, blocked_by=hook_name
- Observations as side effects — compliance logging independent of event delivery
- The scanner can be regex, AI, or external service — the framework doesn't care what's inside the hook
Ambiguous Identity — Shared Family Phone Number (Use Case 7)¶
A customer sends an SMS from a phone number shared by 3 family members. The CRM returns 3 contacts. The framework creates a pending participant and the advisor resolves manually.
STEP 1: SMS arrives, identity resolver runs
────────────────────────────────────────────
SMS: "Bonjour, j'aimerais modifier mon placement"
from: +15551234567
→ Twilio webhook → POST /webhooks/sms/twilio
→ SMSChannel.handle_inbound() → InboundResult
→ Framework calls IdentityResolver.resolve(SMS, "+15551234567")
→ Integrator's CRM resolver:
SELECT * FROM contacts WHERE phone = '+15551234567'
→ 3 results: Marie Tremblay, Jean Tremblay, Sophie Tremblay
→ Returns: IdentityResult(
status="ambiguous",
candidates=[Marie, Jean, Sophie],
address="+15551234567",
channel_type=SMS,
)
STEP 2: Framework fires on_identity_ambiguous hook
───────────────────────────────────────────────────
→ Integrator's hook runs:
- Checks for recent active room with any candidate → none found
- Returns: IdentityHookResult.pending(
display_name="Non identifié (+15551234567)",
candidates=[Marie, Jean, Sophie],
)
→ Framework creates participant with:
identification=pending
candidates=[Marie, Jean, Sophie]
display_name="Non identifié (+15551234567)"
STEP 3: Room created with pending participant
─────────────────────────────────────────────
Room: "session-99"
┌──────────┬────────────┬───────────────────────────────────────────┐
│ Channel │ Access │ Participant │
├──────────┼────────────┼───────────────────────────────────────────┤
│ SMS │ read_write │ PENDING — candidates: Marie, Jean, Sophie│
│ WS │ read_write │ Advisor Dupont (identified) │
└──────────┴────────────┴───────────────────────────────────────────┘
→ Message "Bonjour, j'aimerais modifier mon placement" enters room
→ Event stored with participant_id=pending_participant_id
→ Event delivered normally to advisor via WebSocket
Advisor's dashboard shows:
┌─────────────────────────────────────────────────────────┐
│ ⚠ Participant non identifié (+15551234567) │
│ │
│ Message: "Bonjour, j'aimerais modifier mon placement" │
│ │
│ Qui est-ce? │
│ ○ Marie Tremblay (dernière conversation: 2 jan.) │
│ ○ Jean Tremblay (dernière conversation: 15 déc.) │
│ ○ Sophie Tremblay (aucune conversation) │
│ │
│ [Confirmer] │
└─────────────────────────────────────────────────────────┘
STEP 4: Advisor resolves identity
─────────────────────────────────
Advisor selects "Marie Tremblay":
→ API: POST /api/v1/rooms/session-99/participants/{pending_id}/resolve
body: { "identity_id": "marie-tremblay-uuid" }
→ Framework:
1. Updates participant:
identification=identified
identity=Marie Tremblay
resolved_at=now
resolved_by="advisor"
candidates=None (cleared)
2. Updates ChannelBinding:
participant_id → marie-tremblay-uuid
3. Emits event: participant_identified
RoomEvent(
type=participant_identified,
content=SystemContent(
code="participant_identified",
message="Participant identifié: Marie Tremblay",
data={"identity_id": "marie-tremblay-uuid", "resolved_by": "advisor"}
),
)
4. Fires hook: on_participant_identified
→ Integrator can load conversation history, CRM context, etc.
STEP 5: Conversation continues normally
────────────────────────────────────────
Room now has identified participant:
┌──────────┬────────────┬───────────────────────────────┐
│ Channel │ Access │ Participant │
├──────────┼────────────┼───────────────────────────────┤
│ SMS │ read_write │ Marie Tremblay (identified) │
│ WS │ read_write │ Advisor Dupont (identified) │
└──────────┴────────────┴───────────────────────────────┘
Advisor: "Bonjour Marie! Quel placement souhaitez-vous modifier?"
→ Normal conversation flow, identity resolved.
Timeline:
┌────┬──────────┬──────────────────────────────────────────────┬───────────────┬──────────┐
│ # │ Source │ Content │ Type │ Status │
├────┼──────────┼──────────────────────────────────────────────┼───────────────┼──────────┤
│ 1 │ system │ Room created │ system │ delivered│
│ 2 │ system │ SMS attached (participant=PENDING, │ ch_attach │ delivered│
│ │ │ candidates=[Marie, Jean, Sophie]) │ │ │
│ 3 │ system │ WS attached (participant=Advisor) │ ch_attach │ delivered│
│ 4 │ SMS │ "Bonjour, j'aimerais modifier mon placement"│ message │ delivered│
│ │ │ participant_id=PENDING │ │ │
│ 5 │ system │ Participant resolved → Marie Tremblay │ participant_ │ delivered│
│ │ │ resolved_by=advisor │ identified │ │
│ 6 │ WS │ "Bonjour Marie! Quel placement..." │ message │ delivered│
└────┴──────────┴──────────────────────────────────────────────┴───────────────┴──────────┘
Key framework behaviors demonstrated: - IdentityResolver returns ambiguous result — framework doesn't guess, it asks - on_identity_ambiguous hook — integrator picks the strategy (auto, pending, challenge) - Pending participant — conversation flows normally while identity is unresolved - Manual resolution via REST API — advisor sees candidates and picks one - participant_identified event — clean audit trail in the timeline - on_participant_identified hook — integrator can react (load CRM context, etc.) - No spaghetti — resolver + one hook + API endpoint. Same pattern as everything else.
AI ↔ AI — Multi-Agent Collaboration (Use Case 8)¶
Two AI agents collaborate in a Room: an analyst reads documents and produces findings, a writer generates a client-ready report from those findings. An advisor supervises via WebSocket.
STEP 1: Room setup — analyst + writer + advisor
────────────────────────────────────────────────
Integrator creates room and attaches 3 channels:
Room: "report-42"
┌──────────┬────────────┬─────────────────────────┬─────────────┐
│ Channel │ Access │ Visibility │ Pattern │
├──────────┼────────────┼─────────────────────────┼─────────────┤
│ analyst │ read_write │ all │ Direct │
│ writer │ read_write │ all │ Direct │
│ ws_adv │ read_write │ all │ Direct │
└──────────┴────────────┴─────────────────────────┴─────────────┘
STEP 2: Advisor triggers the chain
───────────────────────────────────
Advisor sends: "Analyse le dossier client #1234 et produis un rapport"
→ RoomEvent(source=ws_adv, chain_depth=0)
→ Event Router broadcasts to all channels with read access:
→ analyst.on_event() receives the message (chain_depth=0)
→ writer.on_event() receives the message (chain_depth=0)
→ writer sees it's not from analyst, returns empty ChannelOutput
STEP 3: Analyst responds
─────────────────────────
analyst.on_event() produces:
ChannelOutput(
events=[RoomEvent(
content=TextContent(text="Analyse du dossier #1234:\n"
"1. Portefeuille équilibré — 60% actions, 40% obligations\n"
"2. Rendement YTD: +8.2%\n"
"3. Risque: Concentration dans le secteur technologique"),
)],
observations=[Observation(type="analysis_complete", data={...})],
)
→ Framework stores event with chain_depth=1 (parent was 0)
→ Event Router broadcasts to all channels:
→ writer.on_event() receives analyst's findings (chain_depth=1)
→ ws_adv delivers findings to advisor's browser
STEP 4: Writer responds to analyst
───────────────────────────────────
writer.on_event() produces:
ChannelOutput(
events=[RoomEvent(
content=TextContent(text="Rapport pour le client:\n\n"
"Cher client,\n\n"
"Votre portefeuille affiche une performance de +8.2%...\n"
"Recommandation: Rééquilibrer le secteur technologique..."),
)],
tasks=[Task(type="review", action="advisor_approval", data={...})],
)
→ Framework stores event with chain_depth=2 (parent was 1)
→ Event Router broadcasts:
→ analyst.on_event() receives writer's report (chain_depth=2)
→ ws_adv delivers report to advisor's browser
→ Task "advisor_approval" stored as side effect
STEP 5: Chain continues (analyst refines)
──────────────────────────────────────────
analyst.on_event() reviews writer's report:
→ "J'ajoute: le client a exprimé une tolérance au risque modérée"
→ chain_depth=3
writer.on_event() updates report:
→ "Rapport mis à jour avec la mention du profil de risque"
→ chain_depth=4
... chain continues as needed ...
STEP 6: Chain depth limit (safety)
──────────────────────────────────
If the back-and-forth continues beyond max_event_chain_depth (default 10):
→ analyst produces response at chain_depth=10
→ Framework blocks it:
status=BLOCKED
blocked_by="event_chain_depth_limit"
→ Observation emitted: type="event_chain_depth_exceeded"
→ Side effects still flow (tasks, observations from the blocked response)
→ Advisor sees the observation and can react (mute one AI, or trigger summary)
Timeline:
┌────┬──────────┬──────────────────────────────────────────┬───────┬──────────┐
│ # │ Source │ Content (truncated) │ Depth │ Status │
├────┼──────────┼──────────────────────────────────────────┼───────┼──────────┤
│ 1 │ ws_adv │ "Analyse le dossier #1234..." │ 0 │ delivered│
│ 2 │ analyst │ "Analyse: 1. Portefeuille équilibré..." │ 1 │ delivered│
│ 3 │ writer │ "Rapport: Cher client..." │ 2 │ delivered│
│ 4 │ analyst │ "J'ajoute: tolérance au risque..." │ 3 │ delivered│
│ 5 │ writer │ "Rapport mis à jour..." │ 4 │ delivered│
│ ...│ ... │ ... │ ... │ ... │
│ 11 │ analyst │ (blocked — depth limit reached) │ 10 │ blocked │
└────┴──────────┴──────────────────────────────────────────┴───────┴──────────┘
Key framework behaviors demonstrated:
- AI ↔ AI uses the same primitives — no special multi-agent API. Two AIChannels in a Room, both read_write, both visibility=all.
- chain_depth tracking — every re-entry increments depth. Human origin = 0, analyst response = 1, writer response = 2, etc.
- chain_depth limit — prevents unbounded recursion. Uses existing blocked + blocked_by mechanism.
- Side effects always flow — even when chain is stopped, tasks and observations from the blocked event are stored.
- Human supervision — advisor sees everything in real-time, can mute an AI mid-chain, or trigger a summary.
- Dynamic control — integrator can mute the writer until the analyst emits an analysis_complete observation, then unmute. Same primitives as Use Case 5.
10. AI Channel Implementation¶
The AI Channel wraps an AIProvider. It doesn't know about permissions — it just produces output. The framework enforces the rules. The provider handles the actual AI interaction (pydantic-ai, Anthropic SDK, OpenAI SDK, or any custom implementation).
class AIChannel(Channel):
"""AI channel — wraps any AIProvider."""
type = ChannelType.AI
category = ChannelCategory.INTELLIGENCE
media_types = [ChannelMediaType.TEXT]
def __init__(self, provider: AIProvider, name: str = "ai"):
self.provider = provider
self.name = name
@property
def info(self) -> dict[str, Any]:
return {"provider": self.provider.name, "model": self.provider.model_name}
async def on_event(self, event: RoomEvent, room: RoomContext) -> ChannelOutput:
"""Process event. Framework handles permissions — we just produce output."""
# Skip own events (prevent loops)
if event.source_channel == self.id:
return ChannelOutput.empty()
# Only process messages
if event.type != EventType.MESSAGE:
return ChannelOutput.empty()
# Build context: timeline + target channel capabilities + media types
history = self._build_messages(room.timeline)
target_capabilities = room.get_target_capabilities(event.participant_id)
target_media_types = room.get_target_media_types(event.participant_id)
context = AIContext(
room=room,
target_capabilities=target_capabilities,
target_media_types=target_media_types,
system_instructions=self._get_instructions(room),
metadata=room.metadata,
)
# Call the provider — could be pydantic-ai, Anthropic SDK, OpenAI, anything
result = await self.provider.generate(messages=history, context=context)
return ChannelOutput(
events=[RoomEvent(
content=TextContent(text=result.text),
source=EventSource(
channel_type=ChannelType.AI,
provider=self.provider.name,
channel_data=AIChannelData(**result.provider_metadata),
),
)],
tasks=result.tasks,
observations=result.observations,
)
# NOTE: The framework decides if these events actually get broadcast
# based on the channel's access, muted status, and write_visibility.
# This channel doesn't need to know about any of that.
Usage — same AIChannel, different providers:
from pydantic_ai import Agent
from roomkit.providers.ai import PydanticAIProvider, AnthropicDirectProvider, MockAIProvider
# Option 1: pydantic-ai (recommended — supports Anthropic, OpenAI, Gemini, etc.)
agent = Agent(model="anthropic:claude-sonnet-4-5", instructions="You help customers...")
fw.register_channel(AIChannel(provider=PydanticAIProvider(agent), name="support"))
# Option 2: Anthropic SDK directly
from anthropic import AsyncAnthropic
client = AsyncAnthropic(api_key="sk-...")
fw.register_channel(AIChannel(provider=AnthropicDirectProvider(client, model="claude-sonnet-4-5"), name="support"))
# Option 3: Mock for testing
fw.register_channel(AIChannel(provider=MockAIProvider(responses=["Hello!"]), name="test_ai"))
# Option 4: Custom provider (integrator writes their own AIProvider)
fw.register_channel(AIChannel(provider=MyCustomProvider(...), name="custom_ai"))
The channel is dumb about permissions. It just produces output. The Event Router enforces the rules. This means the same AI channel can behave differently in different rooms depending on how the integrator configures its binding. And the same AIChannel class works with any AI provider.
11. Conversation Store¶
11.1 Interface¶
ConversationStore (ABC)
├── Rooms
│ ├── create_room(org_id, metadata) -> Room
│ ├── get_room(room_id) -> Room | None
│ ├── find_rooms(org_id, filters) -> list[Room]
│ ├── update_room(room_id, updates) -> Room
│ └── close_room(room_id) -> Room
├── Events
│ ├── add_event(room_id, event) -> RoomEvent
│ ├── get_timeline(room_id, since, limit, offset, visibility_filter) -> list[RoomEvent]
│ └── get_event(event_id) -> RoomEvent | None
├── Channels
│ ├── attach_channel(room_id, binding) -> ChannelBinding
│ ├── detach_channel(room_id, channel_id) -> None
│ ├── update_binding(room_id, channel_id, updates) -> ChannelBinding
│ └── get_bindings(room_id) -> list[ChannelBinding]
├── Participants
│ ├── add_participant(room_id, participant) -> Participant
│ ├── remove_participant(room_id, participant_id) -> None
│ └── get_participants(room_id) -> list[Participant]
├── Identity
│ ├── create_identity(org_id, data) -> Identity
│ ├── resolve_identity(channel, address, org_id) -> Identity | None
│ ├── link_address(identity_id, channel, address) -> None
│ └── get_identity(identity_id) -> Identity | None
├── Tasks
│ ├── create_task(room_id, task) -> Task
│ ├── get_tasks(room_id | org_id, filters) -> list[Task]
│ └── update_task(task_id, updates) -> Task
└── Observations
├── add_observation(room_id, observation) -> Observation
└── get_observations(room_id, filters) -> list[Observation]
11.2 Implementations¶
| Implementation | Purpose |
|---|---|
InMemoryStore |
Testing, prototyping |
PostgresStore |
Production (SQLAlchemy async) |
CustomStore |
Integrator wraps their own DB |
12. Error Handling & Resilience¶
12.1 Send Results¶
SendResult
├── status: sent | queued | failed
├── provider_message_id: str | None
├── error: SendError | None (code, message, retryable)
└── retry_after: datetime | None
12.2 Retry¶
- Exponential backoff with configurable max retries
- Per-channel retry policies
- Fallback channel option (SMS fails → try Email)
- Dead letter queue for permanent failures
12.3 Rate Limiting¶
- Declared per channel via
ChannelCapabilities.rate_limit - Framework queues when rate-limited (no drops)
- Per-organization limits supported
12.4 Circuit Breaker¶
- Opens after N consecutive provider failures
- Emits system RoomEvent
- Periodic probe to detect recovery
12.5 AI Resilience¶
- LLM timeout → configurable fallback (canned response or escalate)
- Token limit → trigger conversation summarization
- Model unavailable → circuit breaker on AI provider
12.6 Event Chain Depth Limit¶
When an event is broadcast to a channel with on_event() (e.g., an AI channel), the channel's response re-enters the pipeline as a new event. That new event may itself trigger another channel's on_event(), and so on. Without a bound, this creates unbounded recursion.
Mechanism:
- RoomKit config:
max_event_chain_depth: int = 10(global, likesys.setrecursionlimit()) - Tracking: The Event Router sets
chain_depth=0on human/inbound events. When a channel'son_event()produces a response that re-enters the pipeline, the response event'schain_depthis set toparent.chain_depth + 1. - Enforcement: When
chain_depth >= max_event_chain_depth, the response event is blocked using the existing mechanism:status=blocked,blocked_by="event_chain_depth_limit". - Observability: An observation is emitted (
type="event_chain_depth_exceeded") with the chain depth, source channel, and room ID. - Side effects still flow: Consistent with mute behavior — tasks and observations from the blocked event are still stored and dispatched.
Example: A room with two AI channels that react to each other's messages. After 10 levels of back-and-forth in a single chain, the 11th response is blocked. The integrator sees the observation and can react (e.g., alert, mute one channel).
What this does NOT do:
- No per-room or per-channel depth config — it's a global safety limit
- No new enum — uses existing EventStatus.blocked + blocked_by
- No new hook trigger — integrator uses existing after_broadcast or observations to react
12.7 Observability¶
RoomKit uses Python's standard logging module. No third-party dependency. The integrator configures handlers, formatters, and log levels — the library just emits logs.
Named loggers:
Every module has its own logger. The integrator controls granularity.
roomkit # root logger
roomkit.core.framework # RoomKit lifecycle (create_room, attach_channel, etc.)
roomkit.core.router # EventRouter — broadcast decisions, permission checks, transcoding
roomkit.core.hooks # HookEngine — hook execution, block/modify/allow decisions
roomkit.core.locks # RoomLockManager — lock acquire/release, LRU eviction
roomkit.channels.sms # SMSChannel — inbound parsing, delivery
roomkit.channels.email # EmailChannel
roomkit.channels.websocket # WebSocketChannel — connect/disconnect, delivery
roomkit.channels.ai # AIChannel — context building, AI generation, response handling
roomkit.channels.whatsapp # WhatsAppChannel
roomkit.providers.sms.twilio # TwilioSMSProvider — HTTP calls, webhook parsing
roomkit.providers.sms.sinch # SinchSMSProvider
roomkit.providers.email.sendgrid # SendGridProvider
roomkit.providers.ai.pydantic_ai # PydanticAIProvider — prompt building, token usage
roomkit.providers.ai.anthropic # AnthropicDirectProvider
roomkit.providers.ai.openai # OpenAIDirectProvider
roomkit.identity # IdentityResolver — resolve, ambiguous, unknown decisions
roomkit.store # ConversationStore — save/get operations
Log levels — what gets logged where:
| Level | What | Example |
|---|---|---|
DEBUG |
Full pipeline trace, raw payloads, hook decisions, delivery details | "EventRouter: delivering evt_abc to sms_customer (chain_depth=0)" |
INFO |
Room created, event stored, channel attached/detached, participant resolved | "Room room_8f3a created for org_acme" |
WARNING |
Delivery failed (retryable), hook timeout, chain depth approaching limit, identity ambiguous | "SMSChannel: delivery failed to +15551234567 (retryable=True, attempt=2/3)" |
ERROR |
Provider error (non-retryable), circuit breaker opened, store write failure | "TwilioSMSProvider: 401 Unauthorized — check credentials" |
Structured log context:
Every log message includes structured context via logging.extra:
logger.info(
"Event delivered",
extra={
"room_id": "room_8f3a",
"event_id": "evt_abc",
"channel_id": "sms_customer",
"provider": "twilio",
"chain_depth": 0,
"status": "sent",
"provider_message_id": "SM1234567890",
"latency_ms": 245,
},
)
The integrator's formatter decides how to render this — plain text, JSON, or whatever their log pipeline expects.
Delivery tracing — debugging "my email wasn't sent":
The full lifecycle of a delivery is logged:
DEBUG roomkit.core.router EventRouter: broadcasting evt_abc to 3 channels
DEBUG roomkit.core.router EventRouter: channel sms_customer — access=read_write, muted=False, visible=True → DELIVER
DEBUG roomkit.channels.sms SMSChannel: delivering evt_abc to +15551234567
DEBUG roomkit.providers.sms.twilio TwilioSMSProvider: POST /Messages {to: +15551234567, body: "Bonjour..."}
DEBUG roomkit.providers.sms.twilio TwilioSMSProvider: 201 Created {sid: SM123, status: queued}
INFO roomkit.core.router Event evt_abc delivered to sms_customer (provider_message_id=SM123, latency_ms=245)
DEBUG roomkit.core.router EventRouter: channel email_customer — access=read_write, muted=False, visible=True → DELIVER
DEBUG roomkit.channels.email EmailChannel: delivering evt_abc to client@example.com
DEBUG roomkit.providers.email.sendgrid SendGridProvider: POST /mail/send {to: client@example.com}
WARNING roomkit.providers.email.sendgrid SendGridProvider: 429 Too Many Requests (retry_after=30s)
WARNING roomkit.channels.email EmailChannel: delivery failed for evt_abc — retryable, queued for retry
ERROR roomkit.channels.email EmailChannel: delivery failed for evt_abc after 3 attempts — SendGrid 429
DEBUG roomkit.core.router EventRouter: channel ai_support — access=read, muted=False → ON_EVENT (intelligence)
DEBUG roomkit.channels.ai AIChannel: building context for room_8f3a (5 messages, target=sms)
DEBUG roomkit.providers.ai.pydantic_ai PydanticAIProvider: generating (model=claude-sonnet-4-5, tokens_in=1250)
DEBUG roomkit.providers.ai.pydantic_ai PydanticAIProvider: complete (tokens_out=89, latency_ms=1340)
INFO roomkit.core.router AIChannel produced response — re-entering pipeline (chain_depth=1)
Framework events for monitoring:
In addition to logs, RoomKit emits structured framework events for programmatic observability:
@fw.on("channel_error")
async def on_error(event: FrameworkEvent):
# event.data = {"channel_id": "email_customer", "error": "429 Too Many Requests",
# "room_id": "room_8f3a", "event_id": "evt_abc", "retryable": True}
await alerting.send(f"Channel {event.data['channel_id']} error: {event.data['error']}")
@fw.on("delivery_failed")
async def on_delivery_failed(event: FrameworkEvent):
# event.data = {"channel_id": "email_customer", "event_id": "evt_abc",
# "provider": "sendgrid", "attempts": 3, "last_error": "429"}
await metrics.increment("delivery.failed", tags={"provider": event.data["provider"]})
| Framework Event | When | Data |
|---|---|---|
channel_error |
Any provider error (retryable or not) | channel_id, error, room_id, retryable |
delivery_failed |
All retry attempts exhausted | channel_id, event_id, provider, attempts, last_error |
delivery_succeeded |
Event successfully delivered | channel_id, event_id, provider_message_id, latency_ms |
hook_timeout |
A sync or async hook exceeded its timeout | hook_name, trigger, timeout_ms, room_id |
hook_error |
A hook raised an exception | hook_name, trigger, error, room_id |
circuit_breaker_opened |
Provider circuit breaker tripped | provider, channel_type, failure_count |
circuit_breaker_closed |
Provider recovered | provider, channel_type |
identity_ambiguous |
Identity resolution returned multiple candidates | address, channel_type, candidate_count |
chain_depth_exceeded |
Event blocked by chain depth limit | room_id, channel_id, chain_depth |
Integration service adds:
The library provides raw logs + framework events. The integration service adds the production observability stack:
| Concern | Library (roomkit) | Integration Service |
|---|---|---|
| Logging | logging with named loggers + structured extra |
JSON formatter, log aggregation (CloudWatch, Datadog, etc.) |
| Tracing | Logs include room_id, event_id, channel_id for correlation |
OpenTelemetry spans, distributed tracing |
| Metrics | Framework events (delivery_succeeded, delivery_failed, etc.) |
Prometheus counters/histograms, Grafana dashboards |
| Alerting | Framework events with error details | PagerDuty, Slack alerts via @fw.on() handlers |
| Error tracking | Python exceptions with full context | Sentry integration in hook/channel error handlers |
Integrator setup — minimal:
import logging
# See everything during development
logging.getLogger("roomkit").setLevel(logging.DEBUG)
# Production: only warnings and errors
logging.getLogger("roomkit").setLevel(logging.WARNING)
# Fine-grained: debug one provider, silence the rest
logging.getLogger("roomkit").setLevel(logging.WARNING)
logging.getLogger("roomkit.providers.sms.twilio").setLevel(logging.DEBUG)
13. RoomKit API (Integrator's View)¶
13.1 Minimal: Human ↔ AI via SMS¶
from roomkit import RoomKit
from roomkit.channels.sms import SMSChannel
from roomkit.channels.ai import AIChannel
from roomkit.providers.sms import TwilioSMSProvider
from roomkit.providers.ai import PydanticAIProvider
from roomkit.store import InMemoryStore
from roomkit.identity.mock import MockIdentityResolver
from pydantic_ai import Agent
agent = Agent(model="anthropic:claude-sonnet-4-5", instructions="You help customers...")
fw = RoomKit(
store=InMemoryStore(),
identity_resolver=MockIdentityResolver(),
)
fw.register_channel(SMSChannel(provider=TwilioSMSProvider(sid="...", token="...", number="+155")))
fw.register_channel(AIChannel(provider=PydanticAIProvider(agent), name="support"))
# Default room setup: attach AI with full access
@fw.hook(trigger="on_room_created", execution="async")
async def setup_room(room: RoomContext):
await fw.attach_channel(room.id, "support", access="read_write", visibility="all")
# --- Integration service (NOT part of the library) ---
# The integrator wraps fw.* calls in their own web framework:
from fastapi import FastAPI, Request, Response
app = FastAPI()
@app.post("/webhooks/sms/twilio")
async def twilio_webhook(request: Request):
payload = await request.form()
event = await fw.process_inbound("sms_twilio_1", dict(payload))
return Response(status_code=200)
@app.post("/api/v1/rooms")
async def create_room(body: CreateRoomRequest):
room = await fw.create_room(organization_id=body.organization_id, metadata=body.metadata)
return room
13.2 Full: Human ↔ Human + AI + Hooks + Permissions¶
from roomkit import RoomKit
from roomkit.store import InMemoryStore # or PostgresStore from integration service
from roomkit.identity import IdentityResolver
fw = RoomKit(
store=InMemoryStore(),
identity_resolver=CRMIdentityResolver(crm_client),
)
# Transport channels
fw.register_channel(SMSChannel(provider=TwilioSMSProvider(...)))
fw.register_channel(HTTPChannel())
# AI channels (provider-agnostic — swap PydanticAIProvider for any AIProvider)
fw.register_channel(AIChannel(provider=PydanticAIProvider(support_agent), name="support"))
fw.register_channel(AIChannel(provider=PydanticAIProvider(task_agent), name="task_creator"))
# --- Hooks: PII blocking (sync, blocks events) ---
@fw.hook(trigger="before_broadcast", execution="sync", priority=0)
async def pii_blocker(event: RoomEvent, room: RoomContext) -> HookResult:
if event.type == EventType.MESSAGE and contains_pii(event.content.text):
return HookResult.block("PII detected")
return HookResult.allow()
# --- Hooks: audit logging (async) ---
@fw.hook(trigger="after_broadcast", execution="async")
async def audit_log(event: RoomEvent, room: RoomContext):
await audit_service.log(event)
# --- Hooks: task dispatch (async) ---
@fw.hook(trigger="on_task_created", execution="async")
async def dispatch_task(task: Task, room: RoomContext):
if task.type == "crm_update":
await crm.update(task.data)
elif task.type == "escalate":
await fw.attach_channel(room.id, "http", access="read_write",
participant=on_call_agent)
await fw.mute(room.id, "support")
await fw.set_visibility(room.id, "support", Visibility.channels(["http"]))
# --- Hooks: room setup (async, fire-and-forget) ---
@fw.hook(trigger="on_room_created", execution="async")
async def setup_room(room: RoomContext):
await fw.attach_channel(room.id, "support", access="read_write", visibility="all")
await fw.attach_channel(room.id, "task_creator", access="read") # read-only, side effects only
# --- Hooks: when human agent joins, reconfigure AI (async) ---
@fw.hook(trigger="on_channel_attached", execution="async")
async def on_agent_joins(event: ChannelEvent, room: RoomContext):
if event.participant and event.participant.role == "agent":
# Mute AI and restrict visibility to agents only
for binding in room.channels:
if binding.category == ChannelCategory.INTELLIGENCE:
await fw.mute(room.id, binding.channel_id)
await fw.set_visibility(room.id, binding.channel_id,
Visibility.channels(["http"]))
# --- Integration service (NOT part of the library) ---
# The integrator wraps fw.* calls in their own web framework:
from fastapi import FastAPI
app = FastAPI()
@app.post("/webhooks/sms/twilio")
async def twilio_webhook(request: Request):
payload = await request.form()
await fw.process_inbound("sms_twilio_1", dict(payload))
return Response(status_code=200)
# ... more routes wrapping fw.* calls
# MCP server is also mounted by the integration service, not the library.
14. RoomKit Primitives Summary¶
The framework provides primitives. Business logic is in the integrator's hooks and code.
┌──────────────────────────────────────┬──────────────────────────────────────┐
│ FRAMEWORK (primitives) │ INTEGRATOR (business logic) │
├──────────────────────────────────────┼──────────────────────────────────────┤
│ Channel access: read | write | rw │ Which access level each channel gets │
│ Mute / unmute a channel │ When to mute/unmute │
│ Visibility: all | channels | internal│ What visibility each channel gets │
│ Attach / detach channels │ Which channels to attach/detach │
│ Hook pipeline (sync/async) │ Hook handlers (PII, compliance, etc.)│
│ Hook block + inject targeted events │ What to block, what to inject │
│ Event status (delivered/blocked/fail)│ How to query and audit blocked events│
│ Event routing respects permissions │ Room setup and channel configuration │
│ Two output paths (events vs effects) │ What tasks to create, how to act │
│ Timeline stores everything (+ blocked)│ What to do with timeline data │
│ REST API + MCP expose operations │ Authentication, authorization │
│ Provider abstraction │ Which provider to use │
│ Identity resolution ABC │ Resolution strategy │
│ Store ABC │ Store implementation choice │
│ Event chain depth limit (configurable)│ Turn budgets, orchestration patterns│
└──────────────────────────────────────┴──────────────────────────────────────┘
What the framework does NOT decide: - When to mute an AI → integrator hook - When to make AI visible only to agents → integrator hook - What "observer mode" means → integrator configures access=read - What "assistant mode" means → integrator configures visibility=channels(["http"]) - When to escalate to human → integrator hook on task/observation - What routing to use → integrator hook on room_created
15. Scope & Phasing¶
Phase 0: Core Models & Primitives¶
- Pydantic models: Room, RoomEvent, EventContent, ChannelBinding, Participant, Identity, Task, Observation
- Permission system: Access, Muted, Visibility
- Channel ABC (unified interface)
- ChannelOutput with two paths (events + side effects)
- ConversationStore ABC + InMemoryStore
- Event Router (broadcast with permission enforcement)
- Hook engine (sync + async pipeline)
- Event chain depth tracking (library-level safety primitive)
- Goal: Validate model against all 5 use cases with unit tests
- Deliverable: Models + Store + Router + Hooks + Permissions. No real channels.
Phase 1: HTTP + SMS + Single AI Channel¶
- HTTPChannel (WebSocket for browser users)
- SMSChannel + TwilioSMSProvider
- AIChannel (pydantic-ai)
- Provider abstraction (SMSProvider ABC)
- Channel capability injection into AI
- Basic identity resolution
- REST API: rooms, events, channels, timeline, permissions
- Goal: Use Cases 1 and 2 end-to-end
Phase 2: Dynamic Permissions + Tasks¶
- Full permission lifecycle: mute/unmute, set_access, set_visibility via API
- Task and observation storage
- Side effects always flow (even when muted)
- on_task_created, on_channel_attached hooks
- Multiple AI channels per Room with different permissions
- Goal: Use Cases 3, 4, 5 (dynamic combinations with permissions)
Phase 3: MCP Server¶
- MCP tools: send_message, create_task, mute/unmute, set_visibility
- MCP resources: room, timeline, channels, tasks
- AI agents interact with Rooms via MCP
- Goal: AI-native Room interaction
Phase 4: Email + WhatsApp¶
- EmailChannel + SendGridProvider / SMTPProvider
- WhatsAppChannel + MetaProvider
- Rich EventContent (buttons, cards, media)
- Channel-specific ChannelData
- Goal: Rich content across diverse channels
Phase 5: Resilience & Production¶
- Retry + exponential backoff
- Rate limiting
- Circuit breaker
- PostgresStore
- Dead letter queue
- Delivery status tracking
- Structured logging, metrics hooks
- Goal: Production-grade reliability
Phase 6: Extensibility¶
- Plugin system for custom channels and providers
- Custom event types
- Advanced identity resolution
- Conversation summarization
- Multi-tenant channel configuration
- Goal: Third parties can build on the framework
16. Open Questions¶
Resolved¶
| # | Question | Decision | Rationale |
|---|---|---|---|
| Q1 | Event Broadcasting Order | Concurrent (Option A) | Technical requirements use asyncio.TaskGroup for concurrent broadcast. Simpler, avoids priority complexity. Read-only channels that need to complete before write channels can be handled via hooks. |
| Q2 | Multiple Read-Write Channels | Let both respond (Option A) | Not an issue — the framework broadcasts the event. If the integrator doesn't want two responders, they mute one. The framework provides primitives (mute, visibility). Business logic decides when to use them. |
| Q3 | REST API Framework | Framework-agnostic (Option B) | The library has no web framework dependency. The integration service uses FastAPI. fw.* methods are plain async — the integrator wraps them in any framework. |
| Q4 | HTTP Channel for Browser Users | Both (Option C) | WebSocketChannel for real-time, HTTPChannel as REST polling fallback. Both are in the project structure. |
| Q5 | Hook Registration | Both (Option C) | Code decorators for core logic, POST /rooms/{id}/hooks API for runtime dynamic rules. |
| Q6 | Naming | roomkit |
Clear, short, descriptive. The "Room" is the core abstraction. |
| Q7 | Relation to TchatNSign | Independent open source library | roomkit is not a TchatNSign library. It is built independently for open source release with community. TchatNSign uses roomkit as a dependency in its integration service — no coupling in the other direction. |
All questions resolved. No open questions remain.
17. Key Design Principles¶
- The Room is the truth — All state lives in the Room. Timeline records everything.
- Everything is a Channel — SMS, browser, AI, integration. Same interface.
- Primitives, not opinions — The framework provides access/mute/visibility. Business logic decides when to use them.
- Two output paths — Room events (subject to permissions) and side effects (always flow). Muting silences the voice, not the brain.
- Providers are swappable — Channel type ≠ provider. Twilio and Sinch both provide SMS. Anthropic and OpenAI both provide AI.
- Hooks intercept — Sync hooks block/modify. Async hooks observe/react. Pipeline architecture.
- Channels are dynamic — Attach, detach, mute, unmute, reconfigure at any time.
- Channel awareness at generation — AI knows target constraints and media types before generating.
- Three layers of channel data — Channel.info (instance), ChannelBinding.metadata (per-room), EventSource (per-event). Never lose data.
- REST + MCP + WebSocket — REST for systems, MCP for AI, WebSocket for real-time. All call the same core.
- Direction declares capability — Channels declare inbound/outbound/bidirectional. Permissions restrict per room.
- Two event levels — Room events (per-room, stored) and framework events (global, for subscribers).
- Media types are first-class — text, audio, video. Route to compatible channels. Ready for the future.
- Chain depth safety — Event chains (channel A responds → triggers channel B → triggers channel A → ...) are bounded by a configurable depth limit. Uses existing
blocked+blocked_bymechanism — no new concepts. - Start narrow — Phase 0 validates the model. Phase 1 proves it. Then expand.
Appendix A: Project Structure¶
The library is a pure async Python package. REST API, WebSocket endpoints, MCP server, PostgresStore, and Redis pub/sub belong to the integration service (see Technical Requirements §11).
roomkit/
├── src/
│ └── roomkit/
│ ├── __init__.py # Public API exports
│ ├── py.typed # PEP 561 marker (typed package)
│ │
│ ├── core/ # Framework internals
│ │ ├── __init__.py
│ │ ├── framework.py # RoomKit class — entry point
│ │ ├── router.py # EventRouter — broadcast + permissions
│ │ ├── hooks.py # HookEngine — sync + async pipeline
│ │ └── locks.py # RoomLockManager — per-room locking
│ │
│ ├── models/ # Pydantic data models (no I/O)
│ │ ├── __init__.py
│ │ ├── room.py # Room, RoomStatus, RoomTimers
│ │ ├── event.py # RoomEvent, EventType, EventStatus, EventContent
│ │ ├── channel.py # ChannelBinding, Access, Visibility, ChannelOutput
│ │ ├── participant.py # Participant, IdentificationStatus
│ │ ├── identity.py # Identity, IdentityResult, IdentityHookResult
│ │ ├── hook.py # HookResult, InjectedEvent, HookTrigger
│ │ ├── task.py # Task, Observation
│ │ └── delivery.py # DeliveryResult, InboundResult
│ │
│ ├── channels/ # Channel ABC + built-in channels
│ │ ├── __init__.py
│ │ ├── base.py # Channel ABC, ChannelType, ChannelCapabilities
│ │ ├── sms.py # SMSChannel
│ │ ├── email.py # EmailChannel
│ │ ├── whatsapp.py # WhatsAppChannel
│ │ └── ai.py # AIChannel
│ │
│ ├── providers/ # Provider ABCs + implementations
│ │ ├── __init__.py
│ │ ├── sms/
│ │ │ ├── __init__.py
│ │ │ ├── base.py # SMSProvider ABC
│ │ │ ├── twilio.py # TwilioSMSProvider (extra: twilio or httpx)
│ │ │ ├── sinch.py # SinchSMSProvider (extra: httpx)
│ │ │ └── mock.py # MockSMSProvider (no deps)
│ │ ├── email/
│ │ │ ├── __init__.py
│ │ │ ├── base.py # EmailProvider ABC
│ │ │ ├── sendgrid.py # SendGridProvider (extra: httpx)
│ │ │ └── mock.py # MockEmailProvider (no deps)
│ │ ├── ai/
│ │ │ ├── __init__.py
│ │ │ ├── base.py # AIProvider ABC, AIContext, AIResponse
│ │ │ ├── pydantic_ai.py # PydanticAIProvider (extra: pydantic-ai)
│ │ │ ├── anthropic.py # AnthropicDirectProvider (extra: anthropic)
│ │ │ ├── openai.py # OpenAIDirectProvider (extra: openai)
│ │ │ └── mock.py # MockAIProvider (no deps)
│ │ └── whatsapp/
│ │ ├── __init__.py
│ │ ├── base.py # WhatsAppProvider ABC
│ │ └── mock.py # MockWhatsAppProvider (no deps)
│ │
│ ├── store/ # Conversation store
│ │ ├── __init__.py
│ │ ├── base.py # ConversationStore ABC
│ │ └── memory.py # InMemoryStore (no deps — testing + dev)
│ │
│ ├── identity/ # Identity resolution
│ │ ├── __init__.py
│ │ ├── base.py # IdentityResolver ABC
│ │ └── mock.py # MockIdentityResolver (no deps — testing)
│ │
│ └── config/ # Provider configs (plain Pydantic models)
│ ├── __init__.py
│ └── providers.py # TwilioConfig, SinchConfig, AnthropicConfig, etc.
│
└── tests/
├── conftest.py # Shared fixtures (InMemoryStore, MockProviders)
├── unit/
│ ├── test_models.py # Pydantic model validation
│ ├── test_router.py # Event routing + permissions
│ ├── test_hooks.py # Hook pipeline (sync + async)
│ ├── test_identity.py # Identity resolution + ambiguity
│ └── test_channels.py # Channel ABC contract tests
└── integration/
├── test_framework.py # Full pipeline: inbound → hooks → broadcast
├── test_use_case_1.py # Human <-> Human (cross-channel)
├── test_use_case_2.py # Human <-> AI
├── test_use_case_3.py # Human <-> Human + AI assistant
├── test_use_case_4.py # Human <-> Human + Observer
├── test_use_case_5.py # Dynamic channel management
├── test_use_case_6.py # SIN sensitivity scanning
└── test_use_case_7.py # Ambiguous identity resolution
What is NOT in the library:
| Missing from lib | Why | Where it lives |
|---|---|---|
api/ directory |
REST API is an integration concern | Integration service |
ws.py |
WebSocket endpoint needs a web framework | Integration service |
mcp.py |
MCP server needs fastmcp | Integration service |
store/postgres.py |
Needs SQLAlchemy + asyncpg | Integration service or separate package |
store/migrations/ |
Needs Alembic | Integration service |
realtime.py |
Needs Redis pub/sub or similar | Integration service |
| e2e tests | Need FastAPI TestClient, HTTP server | Integration service tests |
Appendix B: Class Hierarchy & Architecture¶
Complete class hierarchy showing how all pieces compose:
┌─────────────────────────────────────────────────────────────────────────────┐
│ FRAMEWORK (entry point) │
│ │
│ RoomKit │
│ ├── store: ConversationStore (ABC → InMemoryStore | PostgresStore)│
│ ├── channels: dict[str, Channel] (registered channel instances) │
│ ├── hooks: HookEngine (sync + async hook pipeline) │
│ ├── router: EventRouter (broadcast + permission enforcement) │
│ ├── realtime: RealtimeManager (WebSocket connections + event bus) │
│ ├── identity_resolver: IdentityResolver (ABC → pluggable resolution) │
│ ├── max_event_chain_depth: int = 10 (global safety limit for chains) │
│ │ │
│ │ Operations: │
│ ├── register_channel(channel) (global: add channel to framework) │
│ ├── create_room(org_id, metadata) (create conversation space) │
│ ├── attach_channel(room_id, ...) (connect channel to room) │
│ ├── detach_channel(room_id, ...) (disconnect from room) │
│ ├── mute / unmute / set_access / set_visibility (permission primitives) │
│ ├── update_room_metadata(...) (mutable room metadata) │
│ ├── update_binding_metadata(...) (mutable binding metadata) │
│ │ │
│ │ Event subscription: │
│ ├── hook(trigger, execution, priority) (register hook — decorator) │
│ ├── on(event_name) (subscribe to framework events) │
│ │ │
│ │ Note: REST API, WebSocket endpoints, and MCP server are NOT part of │
│ │ the library. The integrator wraps fw.* calls in their own web framework.│
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ CHANNEL (ABC — unified interface) │
│ │
│ Channel (ABC) │
│ ├── id: str │
│ ├── type: ChannelType (sms | email | websocket | ai | ...) │
│ ├── category: ChannelCategory (transport | intelligence | integr.) │
│ ├── direction: ChannelDirection (inbound | outbound | bidirectional) │
│ ├── media_types: list[ChannelMediaType] (text | audio | video) │
│ ├── capabilities: ChannelCapabilities (max_text_length, supports_*, ...) │
│ ├── info: dict[str, Any] (property — instance config info) │
│ │ │
│ │ Methods: │
│ ├── handle_inbound(raw) → InboundResult (INBOUND: webhook/WS arrives) │
│ ├── deliver(event, binding) → DeliveryResult (OUTBOUND: push event out) │
│ └── on_event(event, room) → ChannelOutput (READ: react to room event) │
│ │
│ Concrete channels: │
│ ├── SMSChannel(provider: SMSProvider) direction=BIDIRECTIONAL │
│ ├── EmailChannel(provider: EmailProvider) direction=BIDIRECTIONAL │
│ ├── WebSocketChannel() direction=BIDIRECTIONAL │
│ ├── HTTPChannel() direction=BIDIRECTIONAL │
│ ├── WhatsAppChannel(provider: WhatsAppProvider) direction=BIDIRECTIONAL │
│ ├── AIChannel(provider: AIProvider) direction=BIDIRECTIONAL │
│ └── CustomChannel(...) direction=varies │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ PROVIDERS (swappable implementations) │
│ │
│ Provider ABCs (one per channel type that needs external services): │
│ │
│ SMSProvider (ABC) │
│ ├── name: str │
│ ├── from_number: str │
│ ├── send_sms(to, body) → ProviderResult │
│ ├── parse_webhook(payload) → InboundMessage │
│ │ │
│ │ Implementations: │
│ │ ├── TwilioSMSProvider(config: TwilioConfig) │
│ │ ├── SinchSMSProvider(config: SinchConfig) │
│ │ └── MockSMSProvider() (testing) │
│ │ │
│ EmailProvider (ABC) │
│ ├── name: str │
│ ├── send_email(to, subject, body) → ProviderResult │
│ ├── parse_inbound(payload) → InboundEmail │
│ │ │
│ │ Implementations: │
│ │ ├── SendGridProvider(config: SendGridConfig) │
│ │ ├── SMTPProvider(config: SMTPConfig) │
│ │ └── MockEmailProvider() │
│ │ │
│ AIProvider (ABC) │
│ ├── name: str │
│ ├── model_name: str │
│ ├── generate(messages, context: AIContext) → AIResponse │
│ │ │
│ │ Implementations: │
│ │ ├── PydanticAIProvider(agent: pydantic_ai.Agent) (recommended) │
│ │ ├── AnthropicDirectProvider(client, model) │
│ │ ├── OpenAIDirectProvider(client, model) │
│ │ └── MockAIProvider(responses: list[str]) (testing) │
│ │ │
│ WhatsAppProvider (ABC) │
│ │ Implementations: │
│ │ └── MetaWhatsAppProvider(config: MetaConfig) │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ PROVIDER CONFIG (typed configuration) │
│ │
│ Each provider has a typed Pydantic config: │
│ │
│ TwilioConfig │
│ ├── account_sid: str │
│ ├── auth_token: SecretStr │
│ ├── from_number: str │
│ └── webhook_url: str | None │
│ │
│ SinchConfig │
│ ├── app_id: str │
│ ├── app_secret: SecretStr │
│ └── from_number: str │
│ │
│ SendGridConfig │
│ ├── api_key: SecretStr │
│ ├── from_email: str │
│ └── reply_to: str | None │
│ │
│ PydanticAIConfig │
│ ├── model: str (e.g., "anthropic:claude-sonnet-4-5")│
│ ├── instructions: str │
│ └── temperature: float = 0.7 │
│ │
│ AnthropicConfig │
│ ├── api_key: SecretStr │
│ ├── model: str = "claude-sonnet-4-5" │
│ ├── max_tokens: int = 4096 │
│ └── temperature: float = 0.7 │
│ │
│ Why typed configs? │
│ - Pydantic validates at startup (fail fast on bad config) │
│ - SecretStr prevents accidental logging of credentials │
│ - IDE autocompletion for provider setup │
│ - Can be loaded from env vars, YAML, JSON, or Python │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ MODELS (Pydantic data models) │
│ │
│ Room │
│ ├── id: str │
│ ├── organization_id: str │
│ ├── status: RoomStatus (active | paused | closed | archived)│
│ ├── channels: list[ChannelBinding] │
│ ├── participants: list[Participant] │
│ ├── timeline: list[RoomEvent] │
│ ├── metadata: dict[str, Any] (mutable via API) │
│ ├── created_at / updated_at: datetime │
│ │ │
│ ChannelBinding (per-room channel permissions) │
│ ├── channel_id: str │
│ ├── room_id: str │
│ ├── access: Access (read | write | read_write) │
│ ├── muted: bool │
│ ├── write_visibility: Visibility (all | channels([...]) | internal) │
│ ├── participant_id: str | None │
│ ├── metadata: dict[str, Any] (mutable via API) │
│ ├── attached_at: datetime │
│ │ │
│ RoomEvent │
│ ├── id: str │
│ ├── room_id: str │
│ ├── index: int (sequential per room) │
│ ├── type: EventType │
│ ├── status: EventStatus (delivered | blocked | failed) │
│ ├── blocked_by: str | None (hook name, if status=blocked) │
│ ├── source_channel: str │
│ ├── participant_id: str | None │
│ ├── content: EventContent (TextContent | AudioContent | ...) │
│ ├── source: EventSource (channel-specific data + raw_payload)│
│ ├── visibility: Visibility │
│ ├── chain_depth: int (0 = inbound; +1 per re-entry) │
│ ├── correlation_id: str | None │
│ ├── metadata: dict[str, Any] │
│ ├── timestamp: datetime │
│ │ │
│ HookResult (what a sync hook returns) │
│ ├── action: "allow" | "block" | "modify" │
│ ├── reason: str | None (why blocked/modified) │
│ ├── modified_event: RoomEvent | None (only for action=modify) │
│ ├── inject: list[InjectedEvent] (targeted replacement events) │
│ ├── tasks: list[Task] (side effects) │
│ └── observations: list[Observation] (side effects) │
│ │
│ InjectedEvent (targeted event from a hook) │
│ ├── content: EventContent (what to send) │
│ ├── target_channel: str | None (one specific channel) │
│ ├── target_channels: list[str] | None (multiple specific channels) │
│ ├── visibility: Visibility = all (fallback if no target) │
│ └── source_label: str = "system" (identifies injected event source)│
│ │
│ ChannelOutput (what a channel produces when processing an event) │
│ ├── events: list[RoomEvent] (→ subject to permissions) │
│ ├── tasks: list[Task] (→ always stored, not subject to mute│)
│ ├── observations: list[Observation] (→ always stored) │
│ └── metadata_updates: dict (→ enrich Room metadata) │
│ Note: Routing suggestions (escalate, etc.) are expressed as Tasks. │
│ │
│ DeliveryResult (what deliver() returns) │
│ ├── status: sent | queued | failed │
│ ├── provider_message_id: str | None │
│ ├── error: str | None │
│ └── metadata: dict[str, Any] (provider-specific delivery info) │
│ │
│ Participant │
│ ├── id: str │
│ ├── role: ParticipantRole (end_user | agent | system) │
│ ├── identity: Identity | None (None if pending) │
│ ├── identification: IdentificationStatus (identified | pending) │
│ ├── candidates: list[Identity] | None (possible identities, if pending) │
│ ├── connected_via: list[str] (channel IDs) │
│ ├── display_name: str | None (resolved name or fallback) │
│ ├── status: ParticipantStatus (active | idle | left) │
│ ├── resolved_at: datetime | None │
│ ├── resolved_by: str | None ("auto" | "advisor" | hook name) │
│ └── joined_at: datetime │
│ │
│ Identity │
│ ├── id: str │
│ ├── organization_id: str │
│ ├── display_name: str | None │
│ ├── channel_addresses: dict[ChannelType, list[str]] │
│ ├── external_id: str | None (CRM contact ID) │
│ └── metadata: dict[str, Any] │
│ │
│ IdentityResult (what IdentityResolver.resolve() returns) │
│ ├── status: "resolved" | "ambiguous" | "unknown" │
│ ├── participant: Participant | None (if resolved) │
│ ├── candidates: list[Identity] (if ambiguous, 2+) │
│ ├── address: str (raw channel address) │
│ └── channel_type: ChannelType │
│ │
│ IdentityHookResult (what on_identity_ambiguous/unknown hook returns) │
│ ├── resolved(identity) (we know who — proceed) │
│ ├── pending(display_name, candidates) (advisor resolves later) │
│ ├── challenge(inject) (ask sender to self-identify) │
│ └── reject(reason) (do not create room/participant) │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ ENUMS (value types) │
│ │
│ ChannelType: sms | email | websocket | http | whatsapp | ai | │
│ custom │
│ ChannelCategory: transport | intelligence | integration │
│ ChannelDirection: inbound | outbound | bidirectional │
│ ChannelMediaType: text | audio | video │
│ EventStatus: delivered | blocked | failed │
│ IdentificationStatus: identified | pending │
│ Access: read | write | read_write │
│ Visibility: all | channels(list[str]) | internal │
│ EventType: message | typing | read_receipt | delivery_status | │
│ channel_attached | channel_detached | │
│ channel_muted | channel_unmuted | channel_updated | │
│ participant_joined | participant_left | │
│ participant_identified | task_created | │
│ observation | system | custom │
│ HookTrigger: before_broadcast | after_broadcast | │
│ on_channel_attached | on_channel_detached | │
│ on_channel_muted | on_channel_unmuted | │
│ on_room_created | on_room_closed | │
│ on_task_created | on_identity_ambiguous | │
│ on_identity_unknown | on_participant_identified | │
│ on_error │
│ ParticipantRole: end_user | agent | system │
│ ParticipantStatus: active | idle | left │
│ RoomStatus: active | paused | closed | archived │
│ DeliveryMode: instant | async | batch │
└─────────────────────────────────────────────────────────────────────────────┘
How everything composes — from config to running framework:
from roomkit import RoomKit
from roomkit.channels.sms import SMSChannel
from roomkit.channels.websocket import WebSocketChannel
from roomkit.channels.ai import AIChannel
from roomkit.providers.sms import TwilioSMSProvider, TwilioConfig
from roomkit.providers.ai import PydanticAIProvider
from roomkit.store import PostgresStore
from pydantic_ai import Agent
# 1. Provider configs (typed, validated at startup)
twilio_config = TwilioConfig(
account_sid="AC...",
auth_token="secret", # SecretStr — safe from accidental logging
from_number="+15551234567",
)
# 2. Providers (created from configs)
sms_provider = TwilioSMSProvider(config=twilio_config)
ai_provider = PydanticAIProvider(
agent=Agent(model="anthropic:claude-sonnet-4-5", instructions="..."),
)
# 3. Channels (created from providers)
sms_channel = SMSChannel(provider=sms_provider) # direction=BIDIRECTIONAL
ws_channel = WebSocketChannel() # direction=BIDIRECTIONAL
ai_channel = AIChannel(provider=ai_provider, name="support") # direction=BIDIRECTIONAL
# 4. RoomKit (orchestrates everything)
fw = RoomKit(
store=InMemoryStore(), # or PostgresStore from integration service
identity_resolver=CRMIdentityResolver(crm_client),
)
fw.register_channel(sms_channel)
fw.register_channel(ws_channel)
fw.register_channel(ai_channel)
# 5. Hooks (integrator's business logic)
@fw.hook(trigger="on_room_created", execution="async")
async def setup_room(room: RoomContext):
await fw.attach_channel(room.id, "support",
access="read_write", visibility="all",
metadata={"language": "fr"})
# 6. RoomKit events (real-time, global)
@fw.on("room_created")
async def notify_dashboard(event: FrameworkEvent):
await dashboard.broadcast(event)
# 7. Integration service (NOT part of the library)
# The integrator wraps fw.* calls in their own web framework:
from fastapi import FastAPI
app = FastAPI()
@app.post("/webhooks/sms/twilio")
async def twilio_webhook(request: Request):
payload = await request.form()
await fw.process_inbound("sms_twilio_1", dict(payload))
return Response(status_code=200)
@app.post("/api/v1/rooms")
async def create_room(body: CreateRoomRequest):
return await fw.create_room(organization_id=body.organization_id)
# MCP, WebSocket endpoints are also mounted by the integration service.
Appendix C: Evolution Summary¶
| Version | Key Insight |
|---|---|
| v1 | AI is a layer above channels. Adapters normalize, AI processes, adapters send. |
| v2 | AI is a channel (same abstraction as SMS). Observer/hybrid modes. Tasks. |
| v3 | Room-first. Human ↔ Human is the base case. Hooks for blocking. Provider abstraction. Dynamic channels. REST + MCP. |
| v4 | Primitives over opinions. Replace ChannelMode enum with access/mute/visibility primitives. Two output paths (events vs side effects). Framework provides mechanics, integrator provides logic. |
| v5 | AI provider abstraction + channel metadata + media types. AI follows same provider pattern as SMS (AIProvider ABC → PydanticAIProvider, AnthropicDirectProvider, etc.). Three-level metadata (channel instance, binding, event). Channel media types (text, audio, video) for future-proofing. |
| v6 | Channel direction + real-time + class hierarchy. Channels declare direction (inbound/outbound/bidirectional). Three methods: handle_inbound, deliver, on_event. Real-time event system (room events + framework events). WebSocket-native with typing indicators. Typed ProviderConfig. Complete class hierarchy. |
| v7 | CPaaS-validated. Research of Twilio, Sinch, Vonage, MessageBird, Infobip. Added: sequential event indexing, read horizon tracking, room timers (auto-transitions), correlation ID, content transcoding, channel fallback, template support. Gap analysis documented in cpaas-comparison.md. |
| v8 | Hook injection + event status + use case validation. Sync hooks can now block AND inject targeted replacement events to specific channels. Events carry status (delivered/blocked/failed) and blocked_by for audit trails. HookResult extended with inject, tasks, observations. New InjectedEvent model for targeted delivery. Use Case 6: SIN sensitivity scanning — validates blocking, targeted responses, and compliance logging. |
| v9 | Identity resolution pipeline + ambiguity handling. IdentityResolver ABC returns IdentityResult with resolved/ambiguous/unknown status. New hook triggers: on_identity_ambiguous, on_identity_unknown, on_participant_identified. Participant model extended with identification status (identified/pending), candidates, resolved_at, resolved_by. IdentityHookResult with 4 strategies: resolved, pending, challenge, reject. REST API: POST /rooms/{id}/participants/{pid}/resolve. Use Case 7: shared family phone number — validates ambiguous identity, advisor manual resolution, audit trail. |
| v10 | Library/service boundary + consistency fixes. Aligned project structure with technical requirements (library has no web framework, API, MCP, or PostgresStore). Removed RoutingDecision from ChannelOutput (use Tasks). Added RoomRouter ABC for pluggable inbound room routing. Documented transcoding pipeline location (Event Router before deliver). Fixed RoomLockManager unbounded growth (LRU eviction). Added idempotency (webhook dedup via provider_message_id, client dedup via correlation_id). Resolved stale open questions (Q1, Q3, Q4, Q5). Updated CPaaS comparison to reflect implemented features. |
| v11 (current) | Event chain depth limit + Quick Start + Named patterns + Contributor guide. Added chain_depth: int to RoomEvent (0 for human/inbound, incremented on channel re-entry). Configurable max_event_chain_depth on Framework (default 10). When depth exceeded, event is blocked via existing status=blocked + blocked_by="event_chain_depth_limit" mechanism — no new enums or status types. Observation emitted for integrator visibility. Side effects still flow (consistent with mute). Added 5-minute Quick Start example (§2). Named patterns table (Direct, Assistant, Observer, Muted, Internal) in §3.3. Contributor guide skeleton (Appendix D). All open questions resolved: Q2 (let both respond), Q6 (roomkit), Q7 (independent open source). |
Appendix D: Contributor Guide¶
This appendix prepares roomkit for open-source contribution. It covers the three most common extension points: custom providers, custom channels, and the tests each requires.
D.1 Implementing a Custom SMSProvider¶
A custom SMS provider connects a new SMS service (e.g., Vonage, AWS SNS) to roomkit.
Step 1: Implement the ABC
# roomkit/providers/sms/vonage.py
from roomkit.providers.sms.base import SMSProvider, InboundMessage, ProviderResult
class VonageSMSProvider(SMSProvider):
name = "vonage"
def __init__(self, api_key: str, api_secret: str, from_number: str):
self.api_key = api_key
self.api_secret = api_secret
self.from_number = from_number
async def send_sms(self, to: str, body: str) -> ProviderResult:
# Call Vonage API via httpx
async with httpx.AsyncClient() as client:
resp = await client.post("https://rest.nexmo.com/sms/json", json={
"api_key": self.api_key,
"api_secret": self.api_secret,
"from": self.from_number,
"to": to,
"text": body,
})
data = resp.json()
return ProviderResult(
status="sent" if data["messages"][0]["status"] == "0" else "failed",
provider_message_id=data["messages"][0].get("message-id"),
)
async def parse_webhook(self, payload: dict) -> InboundMessage:
return InboundMessage(
from_address=payload["msisdn"],
to_address=payload["to"],
text=payload.get("text", ""),
raw_payload=payload,
)
Step 2: Add a typed config
# roomkit/config/providers.py
class VonageConfig(BaseModel):
api_key: str
api_secret: SecretStr
from_number: str
Step 3: Add the extra dependency
D.2 Implementing a Custom AIProvider¶
A custom AI provider connects a new LLM service or framework to roomkit.
Step 1: Implement the ABC
# roomkit/providers/ai/ollama.py
from roomkit.providers.ai.base import AIProvider, AIContext, AIMessage, AIResponse
class OllamaProvider(AIProvider):
name = "ollama"
model_name: str
def __init__(self, model: str, base_url: str = "http://localhost:11434"):
self.model_name = model
self.base_url = base_url
async def generate(
self,
messages: list[AIMessage],
context: AIContext,
) -> AIResponse:
async with httpx.AsyncClient() as client:
resp = await client.post(f"{self.base_url}/api/chat", json={
"model": self.model_name,
"messages": [
{"role": m.role, "content": m.content} for m in messages
],
})
data = resp.json()
return AIResponse(text=data["message"]["content"])
Key contract:
- generate() receives the full conversation history as list[AIMessage] and an AIContext with room metadata, target channel capabilities, and participants.
- Return an AIResponse. Optionally include tasks and observations if the AI produces side effects.
- The provider is responsible for adapting the context to the LLM's API format.
D.3 Implementing a Custom Channel¶
For channels beyond SMS/Email/WebSocket/AI (e.g., Slack, Discord, voice).
# roomkit/channels/slack.py
from roomkit.channels.base import Channel, ChannelType, ChannelCategory, ChannelDirection
class SlackChannel(Channel):
type = ChannelType.CUSTOM
category = ChannelCategory.TRANSPORT
direction = ChannelDirection.BIDIRECTIONAL
media_types = [ChannelMediaType.TEXT]
def __init__(self, bot_token: str, id: str = "slack"):
self.id = id
self.bot_token = bot_token
self.capabilities = ChannelCapabilities(
max_text_length=40000,
supports_rich_text=True,
supports_buttons=True,
supports_threading=True,
supports_media=True,
)
async def handle_inbound(self, raw_payload: Any) -> InboundResult:
# Parse Slack event payload → RoomEvent
...
async def deliver(self, event: RoomEvent, binding: ChannelBinding) -> DeliveryResult:
# Post message to Slack channel/DM via Slack API
...
async def on_event(self, event: RoomEvent, room: RoomContext) -> ChannelOutput:
# Transport channels usually return empty output
return ChannelOutput()
D.4 Required Tests for a New Provider¶
Every provider must ship with tests that validate the ABC contract. Use the following pattern:
# tests/unit/test_providers_vonage.py
import pytest
from roomkit.providers.sms.vonage import VonageSMSProvider
class TestVonageSMSProvider:
"""Contract tests for VonageSMSProvider."""
@pytest.fixture
def provider(self):
return VonageSMSProvider(
api_key="test", api_secret="test", from_number="+15550000000",
)
async def test_send_sms_returns_provider_result(self, provider, httpx_mock):
"""send_sms() must return ProviderResult with status and provider_message_id."""
httpx_mock.add_response(json={"messages": [{"status": "0", "message-id": "abc123"}]})
result = await provider.send_sms("+15551234567", "Hello")
assert result.status == "sent"
assert result.provider_message_id == "abc123"
async def test_send_sms_failure(self, provider, httpx_mock):
"""send_sms() must return status=failed on provider error."""
httpx_mock.add_response(json={"messages": [{"status": "1", "error-text": "Throttled"}]})
result = await provider.send_sms("+15551234567", "Hello")
assert result.status == "failed"
async def test_parse_webhook(self, provider):
"""parse_webhook() must return InboundMessage with from/to/text."""
msg = await provider.parse_webhook({
"msisdn": "+15551234567",
"to": "+15559876543",
"text": "Bonjour",
})
assert msg.from_address == "+15551234567"
assert msg.text == "Bonjour"
Checklist for new providers:
- [ ] Implements all abstract methods from the ABC
- [ ] All methods are
async def - [ ] Has a typed config (
pydantic.BaseModelwithSecretStrfor credentials) - [ ] Has a mock variant (no external deps) for integration tests
- [ ] Unit tests cover: success path, failure path, webhook parsing
- [ ] Added as optional extra in
pyproject.toml - [ ] HTTP calls use
httpx.AsyncClient(notrequests)