Skip to content

RFC: RoomKit — Multi-Channel Conversation Framework

Document Info

Status Draft - Under Discussion (v11)
Author Sylvain Boily
Contributions TchatNSign
Created 2026-01-27
Last Updated 2026-01-27

1. Vision

A Python framework for multi-channel conversations between humans, AI, and programs — across SMS, Email, WhatsApp, HTTP/WebSocket, and any custom channel.

The framework manages Rooms (conversations) where Channels connect. A Channel can be a human on SMS, a human in a browser, an AI agent, or a program that observes and enforces rules. Channels can be added, removed, muted, and reconfigured dynamically. Hooks intercept events for filtering, blocking, or enrichment.

The framework provides primitives. Business logic belongs to the integrator.

Use Cases

┌─────────────────────────────────────────────────────────────────────┐
│ Use Case 1: Human ↔ Human (cross-channel)                          │
│                                                                     │
│   [User A: SMS] ←──── Room ────→ [User B: Browser/HTTP]           │
│                                                                     │
│ A customer on SMS talks with an advisor in a web dashboard.         │
│ Each sees messages in their channel. The Room bridges them.         │
└─────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────┐
│ Use Case 2: Human ↔ AI                                              │
│                                                                     │
│   [User: SMS] ←──── Room ────→ [AI: pydantic-ai]                  │
│                                                                     │
│ A customer on SMS talks with an AI agent.                           │
└─────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────┐
│ Use Case 3: Human ↔ Human + AI assistant                            │
│                                                                     │
│   [User: SMS] ←──── Room ────→ [Advisor: Browser]                  │
│                        ↑                                             │
│                        └──→ [AI: suggestions visible to advisor]   │
│                                                                     │
│ Human conversation. AI is added to help the advisor.                │
│ AI writes with visibility restricted to agent channels only.        │
└─────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────┐
│ Use Case 4: Human ↔ Human + Observer (rules/compliance)             │
│                                                                     │
│   [User: SMS] ←──── Room ────→ [Advisor: Browser]                  │
│                        ↑                                             │
│                        └──→ [Observer: blocks sensitive info]       │
│                                                                     │
│ A sync hook watches events. If someone sends a SIN, the hook        │
│ BLOCKS the message before it reaches anyone. The observer channel   │
│ (read-only) can also produce tasks and observations as side effects.│
└─────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────┐
│ Use Case 5: Dynamic channel management                               │
│                                                                     │
│   [User: SMS] ←── Room ──→ [AI: read_write, visibility=all]       │
│                     │                                                │
│                     │  (AI can't help → escalate)                    │
│                     │                                                │
│   [User: SMS] ←── Room ──→ [AI: muted] + [Advisor: Browser]       │
│                     │                                                │
│                     │  (Advisor wants AI help)                       │
│                     │                                                │
│   [User: SMS] ←── Room ──→ [AI: visibility=agents] + [Advisor]    │
│                                                                     │
│ Channels are attached/detached/muted/reconfigured dynamically.      │
│ The framework provides the primitives. Business logic decides when. │
└─────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────┐
│ Use Case 6: Sensitivity scanning — block + targeted responses        │
│                                                                     │
│   [Customer: SMS] ←── Room ──→ [Advisor: WebSocket]                │
│                         ↑                                           │
│                  sync hook: sensitivity_scanner                      │
│                                                                     │
│ Customer sends SIN (Social Insurance Number) via SMS.               │
│ A sync hook detects the sensitive data BEFORE broadcast.            │
│ The hook BLOCKS the original message and INJECTS:                   │
│   → to SMS (customer): "Your message was blocked — SIN not allowed" │
│   → to WebSocket (advisor): "Customer tried to send sensitive data" │
│ The original message is stored in timeline with status=BLOCKED      │
│ (audit trail) but never delivered to the advisor.                   │
│ An observation is produced for compliance logging.                  │
└─────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────┐
│ Use Case 7: Ambiguous identity — shared phone number (family)        │
│                                                                     │
│   [SMS: +15551234567] ←── Room ──→ [Advisor: WebSocket]            │
│          │                                                          │
│          ├── Marie Tremblay (mother) ← previous conversation        │
│          ├── Jean Tremblay (father)                                 │
│          └── Sophie Tremblay (daughter)                             │
│                                                                     │
│ CRM has 3 contacts with the same phone number (family).             │
│ SMS arrives — IdentityResolver returns "ambiguous" (3 candidates).  │
│ A sync hook (on_identity_ambiguous) decides what to do:             │
│   → Strategy A: Create room with pending participant, advisor       │
│     sees candidates and picks one manually.                         │
│   → Strategy B: Auto-resolve from context (most recent room).       │
│   → Strategy C: Challenge — ask customer to self-identify via SMS.  │
│ Framework provides the pipeline. Integrator picks the strategy.     │
└─────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────┐
│ Use Case 8: AI ↔ AI (multi-agent collaboration)                      │
│                                                                     │
│   [AI: analyst] ←── Room ──→ [AI: writer]                          │
│         ↑                          ↑                                │
│         │                          └── visibility=all               │
│         └── visibility=all                                          │
│                                                                     │
│ Two AI agents collaborate in a Room. The analyst reads documents    │
│ and produces structured findings. The writer reads the analyst's    │
│ findings and generates a client-ready report.                       │
│                                                                     │
│ Each AI responds to the other's events via on_event() — creating   │
│ an event chain. The framework tracks chain_depth on every event.   │
│ When chain_depth >= max_event_chain_depth (default 10), the chain  │
│ is stopped: the next event is blocked (status=BLOCKED,             │
│ blocked_by="event_chain_depth_limit"), an observation is emitted,  │
│ and side effects still flow.                                        │
│                                                                     │
│ Variations:                                                         │
│   → Add a human [Advisor: Browser] to review the final output     │
│   → Add an [Observer: read] for quality scoring (side effects)     │
│   → Mute the writer until the analyst is done (dynamic control)    │
│                                                                     │
│ Same primitives. No special "multi-agent" API. Rooms, channels,    │
│ permissions, and chain_depth handle it.                             │
└─────────────────────────────────────────────────────────────────────┘

What This Framework Is

  • A Room-based conversation manager that connects any combination of humans, AI, and programs
  • A channel abstraction where SMS, HTTP, AI, and custom channels share the same interface
  • A permission system (read/write/mute/visibility) that controls channel behavior
  • A hook system for intercepting, blocking, modifying, and enriching events
  • A provider abstraction where Twilio/Sinch/SendGrid are interchangeable implementations

What This Framework Is NOT

  • Not a CPaaS provider (Twilio, Sinch own the transport)
  • Not an AI framework (pydantic-ai handles agent logic)
  • Not a chat app (it's the plumbing behind any conversation-based app)
  • Not opinionated about business logic — the framework provides primitives (access, mute, visibility), the integrator decides when and why to use them

2. Quick Start (5-Minute Example)

SMS inbound → Room created → AI responds → SMS outbound. Copy-paste, it works.

from roomkit import RoomKit
from roomkit.channels.sms import SMSChannel
from roomkit.channels.ai import AIChannel
from roomkit.providers.sms.twilio import TwilioSMSProvider
from roomkit.providers.ai.pydantic_ai import PydanticAIProvider
from roomkit.store.memory import InMemoryStore
from roomkit.config.providers import TwilioConfig
from pydantic_ai import Agent

# 1. Providers
sms_provider = TwilioSMSProvider(config=TwilioConfig(
    account_sid="AC...", auth_token="secret", from_number="+15559876543",
))
ai_provider = PydanticAIProvider(agent=Agent(
    model="anthropic:claude-sonnet-4-5",
    instructions="Tu es un assistant financier. Réponds en français, sois concis.",
))

# 2. Channels
sms = SMSChannel(provider=sms_provider)
ai = AIChannel(provider=ai_provider, name="support")

# 3. RoomKit
fw = RoomKit(store=InMemoryStore())
fw.register_channel(sms)
fw.register_channel(ai)

# 4. When a new room is created, attach the AI automatically
@fw.hook(trigger="on_room_created", execution="async")
async def auto_attach_ai(room):
    await fw.attach_channel(room.id, "support", access="read_write", visibility="all")

# 5. Process inbound (called by your webhook handler)
event = await fw.process_inbound("sms", {"From": "+15551234567", "Body": "Bonjour"})
# → Room created → AI receives message → AI responds → SMS delivered to customer

That's it. The framework handles room creation, identity resolution, event routing, AI response generation, and SMS delivery. Everything else — hooks, permissions, dynamic channels, identity ambiguity — builds on top of this.


3. Architecture Overview

┌─────────────────────────────────────────────────────────────────────┐
│                  Integration Surfaces                                 │
│                                                                      │
│   ┌───────────────┐         ┌───────────────┐                        │
│   │   REST API    │         │  MCP Server   │                        │
│   │  (humans,     │         │  (AI agents,  │                        │
│   │   dashboards, │         │   tools,      │                        │
│   │   systems)    │         │   resources)  │                        │
│   └───────┬───────┘         └───────┬───────┘                        │
└───────────┼─────────────────────────┼────────────────────────────────┘
            │                         │
            ▼                         ▼
┌─────────────────────────────────────────────────────────────────────┐
│                       RoomKit Core                                   │
│                                                                      │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐               │
│  │ Room Manager  │  │ Event Router │  │  Identity    │               │
│  │              │  │              │  │  Resolver    │               │
│  └──────────────┘  └──────┬───────┘  └──────────────┘               │
│                           │                                          │
│                    ┌──────┴───────┐                                  │
│                    │  Hook Engine │  ← sync/async hooks              │
│                    │  (pipeline)  │    intercept, block, modify,     │
│                    └──────┬───────┘    enrich events                 │
│                           │                                          │
│  ┌──────────────────────────────────┐                               │
│  │       Conversation Store          │                               │
│  │   (Rooms, Events, Identities)    │                               │
│  └──────────────────────────────────┘                               │
└────────────────────────────┬────────────────────────────────────────┘
            Channel Interface │ (same for ALL)
      ┌──────────┬──────────┬┴─────────┬──────────┬──────────┐
      ▼          ▼          ▼          ▼          ▼          ▼
 ┌────────┐┌────────┐┌─────────┐┌──────────┐┌────────┐┌─────────┐
 │  SMS   ││ Email  ││  HTTP/  ││    AI    ││Observer││ Custom  │
 │Channel ││Channel ││  WS     ││ Channel  ││Channel ││ Channel │
 │        ││        ││ Channel ││          ││        ││         │
 └───┬────┘└───┬────┘└─────────┘└──────────┘└────────┘└─────────┘
     │         │
  Provider  Provider                Provider abstraction:
  Layer     Layer                   Channel type ≠ Provider
     │         │                    SMS can be Twilio OR Sinch
     ▼         ▼                    Email can be SendGrid OR SMTP
 ┌────────┐┌────────┐
 │Twilio  ││SndGrid │
 │Sinch   ││SMTP    │
 │Vonage  ││Mailgun │
 └────────┘└────────┘

Key separations:

Layer Responsibility
Integration Surfaces REST API (for humans/systems) + MCP Server (for AI agents)
RoomKit Core Room lifecycle, event routing, hook pipeline, permissions, store, identity
Channel Interface Unified abstraction — everything is a Channel
Provider Layer Interchangeable implementations (Twilio, Sinch, SendGrid...)

3. Core Concepts

3.1 The Room

A conversation space. Channels connect to it. Events flow through it. Hooks intercept them.

Room
├── id: str
├── organization_id: str                (multi-tenant isolation)
├── status: RoomStatus                  (active, paused, closed, archived)
├── channels: list[ChannelBinding]      (attached channels — dynamic, with permissions)
├── participants: list[Participant]     (humans in the conversation)
├── timeline: list[RoomEvent]           (everything that happened)
├── latest_index: int                   (highest event index — for unread calculations)
├── timers: RoomTimers | None           (auto-transitions — inspired by Twilio)
├── metadata: dict[str, Any]            (custom data)
├── created_at: datetime
└── updated_at: datetime

Room Timers (inspired by Twilio Conversations):

RoomTimers
├── inactive_after: timedelta | None    (auto-transition active → paused after N inactivity)
├── closed_after: timedelta | None      (auto-transition paused → closed after N more inactivity)
└── last_activity_at: datetime          (timestamp of last message event)

Example: RoomTimers(inactive_after=timedelta(minutes=30), closed_after=timedelta(hours=24)) - After 30 min of no messages → Room moves to paused, emits room_paused event - After 24h more of no messages → Room moves to closed, emits room_closed event - Any new message resets the timer and moves back to active

Timers are optional. If not set, the Room stays active until explicitly closed.

Key properties: - Channel-agnostic: The Room doesn't know SMS from AI — it routes events - Multi-channel: SMS + Browser + AI + Observer simultaneously - Dynamic: Channels can be attached/detached/muted/reconfigured at any time - Observable: Hooks and read-access channels see everything - Persistent: Survives across sessions, channel switches, escalations

3.2 Channel (Unified Abstraction)

A Channel is anything that interacts with a Room. Human on SMS, human in browser, AI agent, compliance program — all are Channels.

Channel
├── id: str
├── type: ChannelType                   (sms, email, http, websocket, ai, custom)
├── category: ChannelCategory           (transport | intelligence | integration)
├── direction: ChannelDirection         (inbound | outbound | bidirectional)
├── media_types: list[ChannelMediaType] (what this channel carries: text, audio, video)
├── capabilities: ChannelCapabilities   (what this channel supports)
├── provider: str | None                (twilio, sinch, sendgrid — for transport channels)
├── status: ChannelStatus               (connected, disconnected, error)
└── info: dict[str, Any]               (channel instance information — from_number, model, etc.)

Channel media types:

ChannelMediaType (enum)
├── TEXT                                (SMS, Email, Chat, WhatsApp text)
├── AUDIO                              (Phone calls, voice messages, WhatsApp audio)
└── VIDEO                              (Video calls, WhatsApp video)

Channel           media_types
SMS               [text]
Email             [text]                   (attachments are media, but channel carries text)
HTTP/WebSocket    [text]                   (could support audio/video via WebRTC later)
WhatsApp          [text, audio, video]
Phone (future)    [audio]
Video (future)    [video, audio, text]
AI                [text]                   (text generation — could produce audio/video in the future)

Media types serve three purposes: 1. Routing: the framework knows which channels can carry which content 2. Capability matching: AI can adapt generation to the media type 3. Future-proofing: audio/video channels plug in without model changes

Channel info:

Each channel instance exposes an info property with its configuration details:

class SMSChannel(Channel):
    @property
    def info(self) -> dict[str, Any]:
        return {"provider": self.provider.name, "from_number": self.provider.from_number}

class AIChannel(Channel):
    @property
    def info(self) -> dict[str, Any]:
        return {"provider": self.provider.name, "model": self.provider.model_name}

This info is surfaced via the REST API and MCP resources, so integrators can query "what channels are in this room and what are they configured with?"

Three channel categories:

Category Purpose Examples
Transport Carries messages to/from humans SMS, Email, HTTP/WebSocket, WhatsApp
Intelligence Processes events, produces responses/insights AI agent (pydantic-ai), sentiment analyzer
Integration Connects to external systems CRM updater, calendar, notification service

Categories are informational — they help the integrator organize channels. The framework treats all channels the same through the unified interface:

class Channel(ABC):

    type: ChannelType
    category: ChannelCategory
    direction: ChannelDirection
    media_types: list[ChannelMediaType]
    capabilities: ChannelCapabilities

    @property
    def info(self) -> dict[str, Any]:
        """Channel instance information (provider config, from_number, model, etc.)
        Surfaced via REST API and MCP resources."""
        return {}

    async def handle_inbound(self, raw_payload: Any) -> InboundResult:
        """INBOUND: Called when something arrives from outside (webhook, WS message).
        Parses raw payload into a RoomEvent."""
        ...

    async def deliver(self, event: RoomEvent, binding: ChannelBinding) -> DeliveryResult:
        """OUTBOUND: Called when the framework needs to push an event to this channel's outside.
        Sends SMS, pushes to WebSocket, etc."""
        ...

    async def on_event(self, event: RoomEvent, room: RoomContext) -> ChannelOutput:
        """READ: Called when a room event occurs and this channel has read access.
        Channel reacts internally (AI generates response, observer analyzes)."""
        ...

3.3 Channel Binding & Permissions

When a channel is attached to a Room, it gets a ChannelBinding that defines its permissions in that Room. This is the core primitive system.

ChannelBinding
├── channel_id: str
├── room_id: str
├── access: Access                      (read | write | read_write)
├── muted: bool                         (temporary: blocks writing, preserves reading)
├── write_visibility: Visibility        (when writing: who sees the events?)
│   ├── all                             (all channels in the room)
│   ├── channels(["http", "ai_x"])     (only specific channels)
│   └── internal                        (stored in timeline, not broadcast)
├── participant_id: str | None          (which human, if any)
├── last_read_index: int | None         (read horizon — last event index this channel has read)
├── attached_at: datetime
└── metadata: dict[str, Any]

Read horizon (inspired by Twilio): last_read_index tracks how far each channel/participant has read. This enables: - Unread count: room.latest_index - binding.last_read_index - Read receipts: when a channel advances its read index, emit a read_receipt event - UI badges: "3 unread messages"

Updated via:

await fw.mark_read(room_id, channel_id, index=42)    # mark as read up to event 42
await fw.mark_all_read(room_id, channel_id)           # mark everything as read

The three primitives:

Primitive What it controls Framework provides Integrator decides
Access Can this channel read events? Write events? read, write, read_write When to set each access level
Muted Temporarily block writing (reading continues) mute(), unmute() When to mute/unmute
Visibility When this channel writes, who receives? all, channels([...]), internal What visibility each channel gets

Rules:

Reading:
  access=read or access=read_write → channel receives events via on_event()
  access=write → channel does NOT receive events (write-only, rare use case)

Writing:
  access=write or access=read_write → channel can produce Room events
  muted=true → writing is temporarily blocked (overrides access)
  write_visibility determines which other channels see the events

Side Effects (always allowed):
  Tasks, observations, metadata updates → always produced regardless of access/muted
  A read-only channel can still flag compliance issues
  A muted channel can still observe and produce insights
  Side effects are stored separately and exposed via API/hooks

Named patterns: The framework doesn't impose these, but they name the common configurations:

Pattern Configuration Description
Direct access=read_write, visibility=all Channel speaks to everyone. AI talks directly to the customer.
Assistant access=read_write, visibility=channels(["ws_advisor"]) Channel writes, but only the advisor sees it. AI suggests, advisor decides.
Observer access=read Read-only. Analyzes events, produces side effects only (tasks, observations). Cannot write messages.
Muted muted=true Temporarily silenced. Still reads, still produces side effects. Writing is blocked until unmuted.
Internal access=read_write, visibility=internal Writes are stored in timeline but not broadcast. Useful for audit logging.

These are vocabulary, not framework concepts. The integrator combines primitives however they want — the patterns just name the most frequent combinations.

3.4 Two Output Paths

When a channel processes an event, it produces a ChannelOutput with two distinct paths:

ChannelOutput
├── Room Events (messages, responses)
│   ├── events: list[RoomEvent]
│   │
│   │   Subject to permissions:
│   │   - Channel must have write access
│   │   - Channel must not be muted
│   │   - Events are broadcast according to write_visibility
│   │   - Events are stored in the Room timeline
│   │
│   └── The Event Router enforces all of this. The channel just produces output.
└── Side Effects (always allowed)
    ├── tasks: list[Task]               (actionable items for external systems)
    ├── observations: list[Observation] (passive insights: sentiment, flags)
    └── metadata_updates: dict          (enrich Room metadata)

    Not subject to access/mute permissions.
    Routing suggestions (e.g., escalate to human) are expressed as Tasks.
    A read-only or muted channel can still produce side effects.

Why two paths? - A muted AI can't talk to the customer but CAN still flag "this customer is at risk of churning" - A read-only compliance channel can't send messages but CAN create tasks and flag PII - Muting silences the voice, not the brain

3.5 Provider Abstraction

A Channel type and its Provider are separate concepts. This applies to all channels — including AI.

Channel Type:  SMS           Email          WhatsApp
               │              │              │
Provider:      ├─ Twilio      ├─ SendGrid    ├─ Meta Cloud API
               ├─ Sinch       ├─ SMTP        └─ Twilio
               ├─ Vonage      ├─ Mailgun
               └─ Custom      └─ SES

Channel Type:  AI
Provider:      ├─ PydanticAIProvider (wraps pydantic-ai → Anthropic, OpenAI, Gemini, etc.)
               ├─ AnthropicProvider  (Anthropic SDK directly)
               ├─ OpenAIProvider     (OpenAI SDK directly)
               ├─ LangChainProvider  (LangChain)
               └─ Custom             (integrator writes their own)

Transport provider example (SMS):

class SMSChannel(Channel):
    type = ChannelType.SMS
    category = ChannelCategory.TRANSPORT
    media_types = [ChannelMediaType.TEXT]

    def __init__(self, provider: SMSProvider):
        self.provider = provider

class SMSProvider(ABC):
    name: str
    from_number: str
    async def send_sms(self, to: str, body: str) -> ProviderResult: ...
    async def parse_webhook(self, payload: dict) -> InboundMessage: ...

class TwilioSMSProvider(SMSProvider):
    name = "twilio"
    def __init__(self, account_sid: str, auth_token: str, from_number: str): ...

class SinchSMSProvider(SMSProvider):
    name = "sinch"
    def __init__(self, app_id: str, app_secret: str, from_number: str): ...

Intelligence provider example (AI):

The AI channel follows the exact same pattern. The AIProvider ABC defines a generate() interface. The integrator chooses which lib to use — pydantic-ai, LangChain, raw SDKs, or anything custom.

class AIChannel(Channel):
    type = ChannelType.AI
    category = ChannelCategory.INTELLIGENCE
    media_types = [ChannelMediaType.TEXT]

    def __init__(self, provider: AIProvider, name: str = "ai"):
        self.provider = provider
        self.name = name

class AIProvider(ABC):
    """Abstract AI provider — wraps any AI library or SDK."""

    name: str                              # e.g., "pydantic-ai", "anthropic", "openai"
    model_name: str                        # e.g., "claude-sonnet-4-5", "gpt-4o"

    @abstractmethod
    async def generate(
        self,
        messages: list[AIMessage],
        context: AIContext,
    ) -> AIResponse:
        """Generate a response given conversation history and context."""
        ...

@dataclass
class AIContext:
    """Context provided to the AI provider at generation time."""
    room: RoomContext
    target_capabilities: ChannelCapabilities | None   # target channel constraints
    target_media_types: list[ChannelMediaType]         # what the target can carry
    system_instructions: str | None                    # channel-level instructions
    metadata: dict[str, Any]                           # extra context from integrator

@dataclass
class AIResponse:
    """Response from AI provider."""
    text: str
    tasks: list[Task] = field(default_factory=list)
    observations: list[Observation] = field(default_factory=list)
    provider_metadata: dict[str, Any] = field(default_factory=dict)
    # provider_metadata: model, tokens_used, latency_ms, finish_reason, etc.

Framework-provided AI providers:

class PydanticAIProvider(AIProvider):
    """Wraps a pydantic-ai Agent. Supports Anthropic, OpenAI, Gemini, etc."""
    name = "pydantic-ai"

    def __init__(self, agent: Agent):
        self.agent = agent
        self.model_name = str(agent.model)

    async def generate(self, messages, context) -> AIResponse:
        result = await self.agent.run(
            user_prompt=messages[-1].text,
            deps=self._build_deps(context),
            message_history=self._convert_history(messages),
        )
        return AIResponse(text=result.data, provider_metadata={"model": self.model_name})

class AnthropicDirectProvider(AIProvider):
    """Uses Anthropic SDK directly (no pydantic-ai)."""
    name = "anthropic"

    def __init__(self, client: AsyncAnthropic, model: str = "claude-sonnet-4-5"):
        self.client = client
        self.model_name = model

    async def generate(self, messages, context) -> AIResponse:
        response = await self.client.messages.create(
            model=self.model_name,
            messages=self._convert_messages(messages),
            system=context.system_instructions or "",
        )
        return AIResponse(
            text=response.content[0].text,
            provider_metadata={"model": self.model_name, "tokens": response.usage.output_tokens},
        )

class MockAIProvider(AIProvider):
    """For testing — returns canned responses."""
    name = "mock"
    model_name = "mock"
    def __init__(self, responses: list[str]): ...

Why this matters: - Same pattern everywhere: SMS has SMSProvider, AI has AIProvider. Consistent abstraction. - Library-agnostic: pydantic-ai, LangChain, raw SDK, or custom — the framework doesn't care. - Swap providers: Change from OpenAI to Anthropic without touching Room logic. - Test with mocks: MockAIProvider for unit tests, no API calls needed. - Multi-tenant: Different organizations can use different AI providers. - The Room and Event Router never see provider details.

3.6 Channel Metadata — Three Levels

Channel-specific information is captured at three levels. Each level serves a different purpose:

┌──────────────────────────────────────────────────────────────────────────────┐
│ Level 1: Channel Instance (static — lives on the Channel)                    │
│                                                                              │
│ Channel.info → dict[str, Any]                                               │
│                                                                              │
│ SMS:  {"provider": "twilio", "from_number": "+15551234", "account": "AC..."} │
│ AI:   {"provider": "pydantic-ai", "model": "claude-sonnet-4-5"}              │
│ HTTP: {"endpoint": "wss://app.example.com/ws", "protocols": ["websocket"]}  │
│                                                                              │
│ Purpose: What IS this channel? Surfaced via REST API / MCP resources.        │
│ Set by: Channel implementation (from its provider config).                   │
│ Lifetime: Channel instance lifetime. Doesn't change per room.               │
└──────────────────────────────────────────────────────────────────────────────┘

┌──────────────────────────────────────────────────────────────────────────────┐
│ Level 2: Channel Binding (per-room — lives on the ChannelBinding)            │
│                                                                              │
│ ChannelBinding.metadata → dict[str, Any]                                    │
│                                                                              │
│ AI in Room A: {"role": "support", "language": "fr", "persona": "formal"}    │
│ AI in Room B: {"role": "sales", "language": "en", "persona": "casual"}      │
│ SMS in Room: {"customer_name": "Jean", "priority": "high"}                  │
│                                                                              │
│ Purpose: Per-room channel configuration. Same AI channel, different behavior │
│          per room. Integrator-defined.                                        │
│ Set by: Integrator when attaching channel to room.                           │
│ Lifetime: Binding lifetime (attach to detach). Mutable via API.             │
└──────────────────────────────────────────────────────────────────────────────┘

┌──────────────────────────────────────────────────────────────────────────────┐
│ Level 3: Event Source (per-event — lives on the RoomEvent)                   │
│                                                                              │
│ EventSource.channel_data → ChannelData (typed, per-channel)                 │
│ EventSource.raw_payload → dict (original provider payload — never lost)     │
│                                                                              │
│ SMS event:  SMSChannelData { from_number, to_number, segments, encoding }   │
│ AI event:   AIChannelData { model, tokens_used, latency_ms, finish_reason } │
│ HTTP event: HTTPChannelData { session_id, user_agent, ip_address }          │
│                                                                              │
│ Purpose: What happened in THIS specific event? Provider-specific details.    │
│ Set by: Channel implementation when processing inbound or producing output. │
│ Lifetime: Immutable. Part of the event record forever.                      │
└──────────────────────────────────────────────────────────────────────────────┘

Example: querying all three levels for an SMS channel in a room:

# Level 1 — Channel instance info
channel = fw.get_channel("sms_main")
channel.info  # {"provider": "twilio", "from_number": "+15551234"}

# Level 2 — Binding metadata (per-room)
binding = await fw.get_binding(room_id, "sms_main")
binding.metadata  # {"customer_name": "Jean", "priority": "high"}

# Level 3 — Event source (per-event)
event = room.timeline[-1]
event.source.channel_data  # SMSChannelData(from_number="+15559876", to_number="+15551234", segments=1)
event.source.raw_payload   # {"MessageSid": "SM...", "From": "+15559876", ...} (Twilio raw)

3.7 Dynamic Channel Management

The framework provides operations to modify channels at runtime. The integrator decides when to call them:

# Primitives the framework provides:
await fw.attach_channel(room_id, channel_id, access="read_write", visibility="all")
await fw.detach_channel(room_id, channel_id)
await fw.set_access(room_id, channel_id, access="read")
await fw.mute(room_id, channel_id)
await fw.unmute(room_id, channel_id)
await fw.set_visibility(room_id, channel_id, visibility=Visibility.channels(["http"]))

Each operation emits a RoomEvent (channel_attached, channel_detached, channel_muted, channel_unmuted, channel_updated) so the timeline records what happened.

3.8 Channel Direction (Inbound / Outbound)

A channel has inherent directional capabilities — what it can physically do, independent of per-room permissions:

ChannelDirection (enum)
├── INBOUND                             (can receive from outside → produce events)
├── OUTBOUND                            (can send to outside → deliver events)
└── BIDIRECTIONAL                       (both)
Channel           Direction        Why
SMS               BIDIRECTIONAL    Receives webhooks (inbound), sends SMS (outbound)
Email             BIDIRECTIONAL    Receives inbound email, sends outbound email
HTTP/WebSocket    BIDIRECTIONAL    Receives user input, pushes events to browser
AI                BIDIRECTIONAL    Receives room events (inbound), produces responses (outbound)
Observer          INBOUND          Receives events, produces side effects only (no outbound delivery)
Notification      OUTBOUND         Only sends notifications (Slack, push), never receives
Webhook (custom)  INBOUND          Receives external webhooks, injects events into room

Direction vs Access:

Direction = what the channel CAN do (physical/technical — set at channel level)
Access    = what the channel MAY do (permission — set per room binding)

A BIDIRECTIONAL channel can be given access=read (restricting it to inbound only in a room).
An INBOUND channel given access=write makes no sense — the framework validates this.

The Channel ABC declares its direction:

class Channel(ABC):
    type: ChannelType
    category: ChannelCategory
    direction: ChannelDirection              # what this channel can physically do
    media_types: list[ChannelMediaType]
    capabilities: ChannelCapabilities

    # INBOUND: must implement handle_inbound
    async def handle_inbound(self, raw_payload: Any) -> InboundResult:
        """Called when something arrives from outside (webhook, user input)."""
        ...

    # OUTBOUND: must implement deliver
    async def deliver(self, event: RoomEvent, binding: ChannelBinding) -> DeliveryResult:
        """Called when the framework needs to deliver an event to this channel's outside."""
        ...

    # READ: must implement on_event (for processing/reacting to room events)
    async def on_event(self, event: RoomEvent, room: RoomContext) -> ChannelOutput:
        """Called when a room event occurs and this channel has read access."""
        ...

Three methods, three concerns:

Method When called Direction Example
handle_inbound External payload arrives (webhook, WS message) INBOUND Twilio webhook → SMS event
deliver Framework needs to push event to channel's outside OUTBOUND Send SMS, push to WebSocket
on_event Room event occurs, channel has read access READ (any direction) AI processes event, observer analyzes

on_event vs deliver: - on_event: The channel reacts to a room event internally (AI generates a response, observer flags something). Produces ChannelOutput. - deliver: The framework pushes a room event out to the channel's external recipient (send SMS to phone, push to browser via WebSocket). Returns DeliveryResult.

For transport channels, the Event Router calls BOTH: 1. on_event — let the channel react (usually no-op for transport) 2. deliver — push the event to the external recipient

For intelligence channels, only on_event is called (AI doesn't "deliver" to an external system — it produces output back into the room).

3.9 Real-Time Event System

The framework manages real-time connections and provides an event subscription system.

Two levels of events:

┌──────────────────────────────────────────────────────────────────────────────┐
│ Level 1: Room Events (per-room, stored in timeline)                          │
│                                                                              │
│ Events within a room's conversation:                                         │
│ - message, typing, read_receipt, delivery_status                             │
│ - channel_attached, channel_detached, channel_muted, channel_unmuted         │
│ - participant_joined, participant_left                                        │
│ - task_created, observation                                                  │
│                                                                              │
│ Stored in Room timeline. Subject to visibility rules.                        │
│ Delivered to channels with read access in the room.                          │
└──────────────────────────────────────────────────────────────────────────────┘

┌──────────────────────────────────────────────────────────────────────────────┐
│ Level 2: Framework Events (global, for external subscribers)                 │
│                                                                              │
│ Events about the framework state:                                            │
│ - room_created, room_closed, room_archived                                   │
│ - channel_registered, channel_unregistered                                   │
│ - channel_connected, channel_disconnected, channel_error                     │
│ - room_channel_attached, room_channel_detached                               │
│ - identity_created, identity_linked                                          │
│                                                                              │
│ NOT stored in a room timeline. Published to subscribers.                     │
│ Used by dashboards, monitoring, external systems.                            │
└──────────────────────────────────────────────────────────────────────────────┘

Real-time transport for browser/WebSocket channels:

class WebSocketChannel(Channel):
    """Browser-based channel using WebSocket for real-time communication."""

    type = ChannelType.WEBSOCKET
    category = ChannelCategory.TRANSPORT
    direction = ChannelDirection.BIDIRECTIONAL
    media_types = [ChannelMediaType.TEXT]

    async def handle_inbound(self, raw_payload: Any) -> InboundResult:
        """User sends a message or typing indicator via WebSocket."""
        data = json.loads(raw_payload)
        if data["type"] == "typing":
            return InboundResult(event=RoomEvent(type=EventType.TYPING, ...))
        if data["type"] == "message":
            return InboundResult(event=RoomEvent(type=EventType.MESSAGE, ...))

    async def deliver(self, event: RoomEvent, binding: ChannelBinding) -> DeliveryResult:
        """Push event to browser via WebSocket connection."""
        ws = self._get_connection(binding.participant_id)
        if ws and ws.connected:
            await ws.send_json(self._serialize_event(event))
            return DeliveryResult(status="sent")
        return DeliveryResult(status="failed", error="disconnected")

Typing indicators — full flow:

Browser User types...
  → WebSocket message: {"type": "typing", "room_id": "room_1"}
  → WebSocketChannel.handle_inbound() → RoomEvent(type=TYPING)
  → Event Router broadcasts to channels with read access:
      → Other WebSocket channels: deliver() → push typing indicator to other browsers
      → AI channel: on_event() → may ignore (or use for "user is typing" awareness)
      → SMS channel: deliver() → no-op (SMS doesn't support typing indicators)
        (ChannelCapabilities.supports_typing_indicator = false → Event Router skips delivery)

Framework event subscription:

# Subscribe to framework-level events (for dashboards, monitoring)
@fw.on("room_created")
async def on_room_created(event: FrameworkEvent):
    """Notify dashboard that a new room was created."""
    await dashboard_ws.broadcast({"type": "room_created", "room": event.data})

@fw.on("channel_connected")
async def on_channel_connected(event: FrameworkEvent):
    """Monitor channel health."""
    await metrics.record("channel.connected", tags={"type": event.data["channel_type"]})

@fw.on("room_channel_attached")
async def on_channel_added_to_room(event: FrameworkEvent):
    """Notify room participants that a new channel joined."""
    await dashboard_ws.send_to_room(event.data["room_id"], {
        "type": "channel_joined",
        "channel": event.data["channel_info"],
    })

Connection management:

class RoomKit:
    async def connect_websocket(self, ws: WebSocket, room_id: str, participant_id: str):
        """Register a WebSocket connection for a participant in a room."""
        ...

    async def disconnect_websocket(self, ws: WebSocket):
        """Clean up on disconnect. Emits channel_disconnected event."""
        ...

    def get_connected_channels(self, room_id: str) -> list[ChannelBinding]:
        """List currently connected channels for a room (with connection status)."""
        ...

3.10 Room & Channel Metadata

Both Rooms and ChannelBindings carry mutable metadata: dict[str, Any] that can be updated at any time via the API:

# Room metadata — set at creation or updated later
room = await fw.create_room(org_id="org_1", metadata={"source": "inbound_sms", "priority": "high"})
await fw.update_room_metadata(room.id, {"language": "fr", "customer_tier": "premium"})

# Binding metadata — set when attaching or updated later
await fw.attach_channel(room.id, "support", access="read_write",
                        metadata={"persona": "formal", "language": "fr"})
await fw.update_binding_metadata(room.id, "support", {"persona": "casual"})

Metadata is: - Arbitrary: any JSON-serializable dict - Mutable: update via API at any time (emits room_updated / channel_updated event) - Accessible in hooks: room.metadata, binding.metadata available in all hook handlers - Accessible to AI: AIContext.metadata passes room metadata to the AI provider - Queryable: find_rooms(org_id, metadata_filter={"priority": "high"}) for room lookup


4. Hook System

Hooks are the framework's middleware. They intercept events in a pipeline.

4.1 Hook Types

Hook
├── name: str
├── trigger: HookTrigger              (when does this hook fire?)
│   ├── before_broadcast              (before event reaches channels — can BLOCK)
│   ├── after_broadcast               (after all channels processed — for logging)
│   ├── on_channel_attached
│   ├── on_channel_detached
│   ├── on_channel_muted
│   ├── on_channel_unmuted
│   ├── on_room_created
│   ├── on_room_closed
│   ├── on_task_created
│   ├── on_identity_ambiguous         (identity resolution returned multiple candidates)
│   ├── on_identity_unknown           (identity resolution returned no matches)
│   ├── on_participant_identified     (pending participant was resolved)
│   └── on_error
├── execution: HookExecution          (sync or async?)
│   ├── sync                          (blocking — event waits for hook to complete)
│   └── async                         (non-blocking — fire and forget)
├── priority: int                      (execution order — lower = first)
└── handler: Callable                  (the function to run)

4.2 Sync Hooks (Blocking)

Sync hooks run before the event is broadcast. They can: - Allow the event (return HookResult.allow()) - Block the event (return HookResult.block(...)) — optionally inject replacement events + side effects - Modify the event (return HookResult.modify(new_event))

HookResult model:

HookResult
├── action: "allow" | "block" | "modify"
├── reason: str | None                          (why blocked/modified — stored on event)
├── modified_event: RoomEvent | None            (only for action=modify)
├── inject: list[InjectedEvent]                 (targeted events sent INSTEAD of blocked event)
├── tasks: list[Task]                           (side effects — always processed)
└── observations: list[Observation]             (side effects — always processed)

InjectedEvent
├── content: EventContent                        (what to send)
├── target_channel: str | None                   (send to ONE specific channel)
├── target_channels: list[str] | None            (send to MULTIPLE specific channels)
├── visibility: Visibility = all                 (fallback if no target specified)
└── source_label: str = "system"                 (identifies the injected event source)

When a hook blocks an event: 1. The original event is stored in the timeline with status=BLOCKED and blocked_by=hook_name (audit trail) 2. The original event is NOT delivered to any channel 3. Each InjectedEvent is delivered to its targeted channel(s) as a new event with status=DELIVERED 4. Tasks and observations are processed as side effects (compliance logging, alerts, etc.)

Example — SIN sensitivity scanner:

@fw.hook(trigger="before_broadcast", execution="sync", priority=0)
async def sensitivity_scanner(event: RoomEvent, room: RoomContext) -> HookResult:
    """Block messages containing sensitive personal data (SIN)."""
    if event.type != EventType.MESSAGE:
        return HookResult.allow()

    text = event.content.text
    sin_pattern = re.compile(r'\b\d{3}[-\s]?\d{3}[-\s]?\d{3}\b')

    if sin_pattern.search(text):
        return HookResult.block(
            reason="SIN detected — blocked for security",
            inject=[
                # Notify the sender (customer on SMS)
                InjectedEvent(
                    target_channel=event.source_channel,
                    content=TextContent(
                        text="Votre message a été bloqué. Les numéros d'assurance "
                             "sociale ne peuvent pas être envoyés par SMS pour des "
                             "raisons de sécurité."
                    ),
                ),
                # Notify all OTHER channels (advisor on WebSocket)
                InjectedEvent(
                    target_channels=room.other_channels(event.source_channel),
                    content=TextContent(
                        text="Le client a tenté d'envoyer des données sensibles (NAS). "
                             "Le message a été bloqué pour des raisons de sécurité."
                    ),
                ),
            ],
            observations=[
                Observation(
                    type="sensitivity_violation",
                    data={
                        "pattern": "SIN",
                        "channel": event.source_channel,
                        "room_id": room.id,
                    },
                ),
            ],
        )

    # Credit card: redact instead of blocking
    if contains_credit_card(text):
        redacted_text = redact_credit_card(text)
        return HookResult.modify(event.with_content(TextContent(text=redacted_text)))

    return HookResult.allow()

4.3 Async Hooks (Non-Blocking)

Async hooks run after the event is broadcast. They don't affect the event flow:

@fw.hook(trigger="after_broadcast", execution="async")
async def log_to_audit_trail(event: RoomEvent, room: RoomContext):
    await audit_service.log(event)

@fw.hook(trigger="on_task_created", execution="async")
async def dispatch_task(task: Task, room: RoomContext):
    if task.type == "callback":
        await schedule_callback(task)
    elif task.type == "crm_update":
        await update_crm(task)

4.4 Hook Pipeline

Inbound Event
┌──────────────────────────────────┐
│ Sync Hooks (ordered by priority) │
│                                  │
│ [0] Sensitivity Scanner          │
│     → allow / block+inject       │
│ [1] Profanity Filter             │
│     → allow / modify (redact)    │
│ [2] Rate Limiter → allow/block   │
│ [3] Language Detector → modify   │
│     (adds metadata.language)     │
└──────────────┬───────────────────┘
          blocked? ──yes──┐
               │          │
               │     ┌────▼──────────────────────────────────┐
               │     │ 1. Store original event with           │
               │     │    status=BLOCKED, blocked_by=hook     │
               │     │    (audit trail — never delivered)      │
               │     │                                        │
               │     │ 2. Deliver InjectedEvents to           │
               │     │    targeted channels                   │
               │     │    → customer gets security notice     │
               │     │    → advisor gets notification         │
               │     │                                        │
               │     │ 3. Process side effects                │
               │     │    → observations (compliance log)     │
               │     │    → tasks (if any)                    │
               │     └────────────────────────────────────────┘
               ▼ (allowed, possibly modified)
┌──────────────────────────────────┐
│ Event Router                     │
│ Checks permissions:              │
│ - Source has write access?       │
│ - Source is not muted?           │
│ Broadcasts to channels with      │
│ read access, filtered by         │
│ source's write_visibility        │
│ Event stored with status=DELIVERED│
└──────────────┬───────────────────┘
┌──────────────────────────────────┐
│ Async Hooks (fire and forget)    │
│                                  │
│ [·] Audit Logger                 │
│ [·] Analytics                    │
│ [·] Webhook Notifier             │
└──────────────────────────────────┘

4.5 Room-Level vs Global Hooks

# Global hook — applies to ALL rooms
@fw.hook(trigger="before_broadcast", execution="sync")
async def global_pii_blocker(event, room):
    ...

# Room-level hook — applies to ONE specific room
await fw.add_room_hook(
    room_id="room_123",
    trigger="before_broadcast",
    execution="sync",
    handler=custom_compliance_check,
)

4.6 Hook vs Read-Only Channel — When to Use Which?

Sync Hook Async Hook Read-Only Channel
Can block events Yes No No
Can modify events Yes No No
Can inject targeted events Yes (on block) No No
Can produce tasks Yes (on block) Yes Yes (side effects)
Can produce observations Yes (on block) Yes Yes (side effects)
Can produce responses No No No (read-only)
Needs AI/LLM Usually no (rule-based) Maybe Usually yes
Runs Before broadcast After broadcast During broadcast

Rule of thumb: - Need to block and replace with targeted responses? → Sync hook with HookResult.block(inject=[...]) - Need to block or modify events (simple)? → Sync hook (fast, rule-based) - Need to react to events with side effects? → Async hook - Need AI-powered analysis producing insights? → Channel with access=read (produces side effects only) - Need AI responses to the conversation? → Channel with access=read_write


5. Event Model

5.1 Room Events

Everything in a Room is a RoomEvent:

RoomEvent
├── id: str
├── room_id: str
├── index: int                  (sequential per room: 0, 1, 2, ... — for pagination & read tracking)
├── timestamp: datetime
├── type: EventType
│   ├── message              (text, media, rich content)
│   ├── participant_joined
│   ├── participant_left
│   ├── channel_attached
│   ├── channel_detached
│   ├── channel_muted
│   ├── channel_unmuted
│   ├── channel_updated      (access or visibility changed)
│   ├── task_created
│   ├── observation
│   ├── participant_identified  (pending participant was resolved)
│   ├── typing
│   ├── read_receipt
│   ├── delivery_status      (sent, delivered, failed)
│   ├── system               (errors, notifications)
│   └── custom               (extensible — e.g., "custom:your_namespace")
├── status: EventStatus         (delivered | blocked | failed)
├── blocked_by: str | None      (hook name that blocked it, if status=blocked)
├── source_channel: str      (which channel produced this)
├── participant_id: str | None  (which human, if applicable)
├── content: EventContent    (normalized payload)
├── source: EventSource      (original channel-specific data)
├── visibility: Visibility   (inherited from source channel's write_visibility)
├── chain_depth: int               (0 = human/inbound origin; incremented on channel re-entry)
├── correlation_id: str | None  (integrator's external reference — echoed in delivery reports)
└── metadata: dict[str, Any]

Sequential indexing (inspired by Twilio): Every event in a room gets a monotonically increasing index. This enables: - Efficient pagination: "get events 50-100" - Read horizon: "participant last read index 42" → events 43+ are unread - Gap detection: missing index = lost event - Ordering guarantees without relying on timestamp precision

5.2 Event Content (Normalized Layer)

Channel-agnostic content, organized by media type:

EventContent (discriminated union)

Text media:
├── TextContent { text }
├── RichContent { text, buttons, cards, quick_replies }

Visual/file media:
├── MediaContent { url, mime_type, caption, size_bytes }
├── LocationContent { latitude, longitude, label }

Audio media (Phase 4+):
├── AudioContent { url, duration_seconds, mime_type, size_bytes, transcript }

Video media (Phase 4+):
├── VideoContent { url, duration_seconds, mime_type, size_bytes, thumbnail_url, transcript }

Composite:
├── CompositeContent { parts: list[EventContent] }
│   (e.g., a WhatsApp message with text + image + audio)

System:
└── SystemContent { code, message, data }

Each content type maps to a ChannelMediaType: - TextContent, RichContent → requires channel with TEXT media type - AudioContent → requires channel with AUDIO media type - VideoContent → requires channel with VIDEO media type - MediaContent → depends on mime_type (image → TEXT channel can handle via attachments) - CompositeContent → requires channel supporting all parts' media types

The Event Router checks media type compatibility: if a channel doesn't support the content's media type, the event is not delivered to that channel (or a fallback is triggered, e.g., audio → transcript as text).

5.3 Event Source (Channel-Specific Layer)

Preserves original channel data:

EventSource
├── channel_type: ChannelType
├── provider: str                   (twilio, sinch, anthropic, etc.)
├── raw_payload: dict[str, Any]     (original provider payload — never lost)
├── provider_message_id: str | None
└── channel_data: ChannelData       (typed, per-channel structured data)
    ├── SMSChannelData { from_number, to_number, segments, encoding }
    ├── EmailChannelData { from, to, cc, subject, thread_id, html_body, attachments }
    ├── WhatsAppChannelData { wa_id, template, buttons, context }
    ├── HTTPChannelData { session_id, user_agent, ip_address }
    ├── AIChannelData { model, agent_name, tokens_used, latency_ms }
    └── CustomChannelData { data: dict }

Two layers, one principle: normalize for routing, preserve for specifics.

This is Level 3 of the three-level channel metadata system (see §3.6). Level 1 (channel instance info) and Level 2 (binding metadata) provide static/per-room context. Level 3 captures what happened in each specific event.


6. Channel Capability System

6.1 Capabilities

Each channel declares what it supports. Capabilities are informed by the channel's media_types:

ChannelCapabilities

Text capabilities (channels with TEXT media type):
├── max_text_length: int | None         (SMS: 160, WhatsApp: 4096, Email: None)
├── supports_rich_text: bool            (Email: yes, SMS: no)
├── supports_buttons: bool
├── max_buttons: int | None
├── supports_cards: bool
├── supports_quick_replies: bool
├── supports_templates: bool

Media capabilities:
├── supports_media: bool
├── supported_media_types: list[str]    (["image/jpeg", "image/png", "audio/ogg", ...])
├── max_media_size_bytes: int | None

Audio capabilities (channels with AUDIO media type, Phase 4+):
├── supports_audio: bool
├── max_audio_duration_seconds: int | None
├── supported_audio_formats: list[str]  (["ogg", "mp3", "wav"])

Video capabilities (channels with VIDEO media type, Phase 4+):
├── supports_video: bool
├── max_video_duration_seconds: int | None
├── supported_video_formats: list[str]  (["mp4", "webm"])

Delivery:
├── supports_threading: bool
├── supports_typing_indicator: bool
├── supports_read_receipts: bool
├── delivery_mode: DeliveryMode         (instant, async, batch)
├── rate_limit: RateLimit | None

Extensible:
└── custom: dict[str, Any]

6.2 Capability-Aware AI

When an AI channel responds, the framework provides the target transport channel's capabilities and media types via the AIContext. The AI generates content appropriate for the target at generation time, not via post-processing:

SMS target (text only, 160 chars):
  → AIContext.target_capabilities = {max_text_length: 160, supports_rich_text: false}
  → AIContext.target_media_types = [TEXT]
  → AI prompt includes: "Keep under 160 chars. Plain text. Be direct."

Email target (text, rich):
  → AIContext.target_capabilities = {max_text_length: None, supports_rich_text: true}
  → AIContext.target_media_types = [TEXT]
  → AI prompt includes: "Use formatting. Include details. Professional tone."

WhatsApp target (text + audio + video):
  → AIContext.target_capabilities = {max_text_length: 4096, supports_buttons: true}
  → AIContext.target_media_types = [TEXT, AUDIO, VIDEO]
  → AI prompt includes: "Rich text supported. Buttons available. Can send voice notes."

Browser target (text, rich UI):
  → AIContext.target_capabilities = {supports_buttons: true, supports_cards: true}
  → AIContext.target_media_types = [TEXT]
  → AI prompt includes: "Rich text supported. Buttons and cards available."

The transport channel enforces hard limits as a safety net only. The AI provider receives the full context and decides how to generate.

6.3 Content Transcoding (Non-AI Cross-Channel)

When an AI channel produces content, it generates capability-aware content at prompt time (§6.2). But what about human-to-human cross-channel? A user on WhatsApp sends a rich card with buttons → the SMS user can't display it.

The framework provides a pluggable ContentTranscoder:

class ContentTranscoder(ABC):
    """Converts content for target channels that don't support the original format."""

    async def transcode(
        self,
        content: EventContent,
        source_capabilities: ChannelCapabilities,
        target_capabilities: ChannelCapabilities,
    ) -> EventContent:
        """Convert content to be compatible with the target channel."""
        ...

Default transcoding rules (built-in):

RichContent (buttons + cards) → TextContent
  "Choose one: 1. Option A  2. Option B  3. Option C"

CarouselContent → Sequential TextContent
  "Product 1: Name - Description  |  Product 2: Name - Description"

MediaContent → TextContent (with URL)
  "[Image] https://example.com/image.jpg"

AudioContent → TextContent (with transcript or link)
  "[Voice message] [transcript] or [link]"

LocationContent → TextContent
  "[Location] 45.5017 N, 73.5673 W - Office HQ"

The integrator can override with a custom ContentTranscoder for domain-specific rules.

Where transcoding happens in the pipeline:

The Event Router calls ContentTranscoder.transcode() before calling channel.deliver() for each target channel. This means: 1. Event Router receives an event with its original content. 2. For each target channel, the router checks media type compatibility. 3. If the content is incompatible with the target, the router transcodes it. 4. The transcoded content is passed to channel.deliver(). 5. If transcoding fails (content cannot be represented), the event is not delivered to that channel and a delivery_status(failed) event is emitted.

AI channels bypass transcoding entirely — they receive the original content and generate capability-aware responses at prompt time (see SS6.2).

6.4 Channel Fallback

When delivery to a channel fails, the framework can automatically try another channel (inspired by Sinch and MessageBird):

FallbackPolicy
├── priority: list[str]                 (ordered channel IDs to try: ["whatsapp", "sms", "email"])
├── timeout: timedelta                  (wait before trying next channel: e.g., 15 minutes)
├── on_failure: FallbackBehavior        (retry_next | give_up | hook)
└── max_retries: int                    (max channels to try before giving up)
# Set fallback policy on a room (applies to all delivery)
await fw.set_fallback_policy(room_id, FallbackPolicy(
    priority=["whatsapp", "sms", "email"],
    timeout=timedelta(minutes=15),
    on_failure="retry_next",
))

# Or per binding (channel-specific fallback)
await fw.attach_channel(room_id, "whatsapp",
    access="read_write",
    fallback=FallbackConfig(fallback_channel_id="sms", after=timedelta(minutes=15)),
)

The Event Router handles fallback: if deliver() returns DeliveryResult(status="failed"), check fallback policy and retry on next channel.

6.5 Template Support

WhatsApp (and some other channels) requires pre-approved templates for outbound messages outside the 24-hour session window. This is a hard requirement for WhatsApp support.

TemplateContent (EventContent variant)
├── template_id: str                    (the template identifier)
├── language: str                       (e.g., "fr", "en")
├── parameters: dict[str, Any]          (template variable values)
├── channel_templates: dict[ChannelType, ChannelTemplate]  (per-channel template config)
└── fallback: EventContent | None       (what to send if template not supported)
# Send a WhatsApp template message
event = RoomEvent(
    content=TemplateContent(
        template_id="order_confirmation",
        language="fr",
        parameters={"order_id": "1234", "total": "99.99"},
        fallback=TextContent(text="Votre commande #1234 a été confirmée. Total: 99.99$"),
    ),
)

Channels that support templates (WhatsApp, Viber) use the template. Others fall back to the fallback content.


7. Participant & Identity

7.1 Participant

A Participant is a human (or system identity) in the Room:

Participant
├── id: str
├── role: ParticipantRole           (end_user, agent, system)
├── identity: Identity | None       (None if pending identification)
├── identification: IdentificationStatus  (identified | pending)
├── candidates: list[Identity] | None     (set when pending — the possible identities)
├── connected_via: list[str]        (channel IDs this participant uses)
├── status: ParticipantStatus       (active, idle, left)
├── display_name: str | None        (resolved name, or fallback like "Non identifié (+1555...)")
├── resolved_at: datetime | None    (when identification was confirmed)
├── resolved_by: str | None         (who resolved: "auto", "advisor", "customer", hook name)
└── joined_at: datetime

Identification status:

Status Meaning
identified Identity resolved — we know who this participant is
pending Identity ambiguous or unknown — waiting for resolution

When a participant is pending: - Messages are still delivered and stored (the conversation isn't blocked) - candidates lists the possible identities (e.g., 3 family members) - The advisor can resolve via API or the system can auto-resolve via hook - Once resolved, identification=identified, identity is set, resolved_at/resolved_by are filled - A participant_identified event is emitted in the timeline

Important distinction: - Participants = who is in the conversation (humans) - Channels = how they connect and what processes events - A Participant connects via one or more transport channels - AI and integration channels are NOT participants — they're infrastructure - The Room's participant list answers "who is in this conversation?"

7.2 Identity

Cross-channel identity resolution:

Identity
├── id: str
├── organization_id: str
├── display_name: str | None
├── channel_addresses: dict[ChannelType, list[str]]
│   (e.g., {sms: ["+15551234"], email: ["john@example.com"]})
├── external_id: str | None         (CRM contact ID)
└── metadata: dict[str, Any]

Pluggable resolution via IdentityResolver ABC: - Address-based (match by phone/email) - CRM-based (look up in external system) - Explicit (integrator provides identity) - Manual (human agent links identities)

7.3 Identity Resolution Pipeline

When an inbound event arrives (e.g., SMS webhook), the framework runs the identity resolution pipeline:

Inbound arrives (channel_type=SMS, address="+15551234567")
┌─────────────────────────────────────────────────────────────┐
│ IdentityResolver.resolve(channel_type, address, channel_data)│
│                                                             │
│ Returns: IdentityResult                                      │
│   status: "resolved" | "ambiguous" | "unknown"              │
│   participant: Participant | None     (if resolved)          │
│   candidates: list[Identity]          (if ambiguous)         │
│   address: str                        (raw channel address)  │
│   channel_type: ChannelType                                  │
└─────────────┬───────────────────────────────────────────────┘
     ┌────────┼────────────────────┐
     ▼        ▼                    ▼
  RESOLVED    AMBIGUOUS            UNKNOWN
  (1 match)   (N matches)         (0 matches)
     │        │                    │
     │        ▼                    ▼
     │   on_identity_ambiguous   on_identity_unknown
     │   sync hook               sync hook
     │        │                    │
     │   Returns one of:      Returns one of:
     │   • resolved(identity)  • create(new_identity)
     │   • pending(candidates) • pending()
     │   • challenge(inject)   • challenge(inject)
     │   • reject()            • reject()
     │        │                    │
     ▼        ▼                    ▼
  Proceed with identified    Proceed with pending
  or pending participant     or rejected

IdentityResolver ABC:

class IdentityResult:
    """Result of identity resolution — can be resolved, ambiguous, or unknown."""
    status: Literal["resolved", "ambiguous", "unknown"]
    participant: Participant | None       # set if status=resolved
    candidates: list[Identity]            # set if status=ambiguous (2+)
    address: str                          # raw channel address (+15551234567)
    channel_type: ChannelType             # SMS, email, whatsapp, etc.

class IdentityResolver(ABC):
    """Pluggable identity resolution — integrator implements this."""

    @abstractmethod
    async def resolve(
        self,
        channel_type: ChannelType,
        address: str,
        channel_data: dict[str, Any],
    ) -> IdentityResult:
        """Resolve a channel address to a participant.

        Examples:
        - CRM lookup: SELECT * FROM contacts WHERE phone = address
        - Address book lookup: match by phone or email
        - External API: call CRM service
        """
        ...

Identity hook result:

class IdentityHookResult:
    """What the integrator's hook returns when identity is ambiguous or unknown."""

    @staticmethod
    def resolved(identity: Identity) -> IdentityHookResult:
        """Resolved — we know who this is. Proceed with identified participant."""
        ...

    @staticmethod
    def pending(
        display_name: str | None = None,
        candidates: list[Identity] | None = None,
    ) -> IdentityHookResult:
        """Pending — create participant with status=pending. Advisor resolves later."""
        ...

    @staticmethod
    def challenge(inject: InjectedEvent) -> IdentityHookResult:
        """Challenge — hold the message, ask the sender to self-identify."""
        ...

    @staticmethod
    def reject(reason: str = "Unknown sender") -> IdentityHookResult:
        """Reject — do not create room or participant."""
        ...

Example — CRM resolver with ambiguity handling:

# 1. Integrator implements the resolver (CRM lookup)
class CRMIdentityResolver(IdentityResolver):
    def __init__(self, crm_client: CRMClient):
        self.crm = crm_client

    async def resolve(self, channel_type, address, channel_data) -> IdentityResult:
        contacts = await self.crm.find_contacts_by_phone(address)

        if len(contacts) == 1:
            return IdentityResult(
                status="resolved",
                participant=self._to_participant(contacts[0]),
                address=address,
                channel_type=channel_type,
            )
        elif len(contacts) > 1:
            return IdentityResult(
                status="ambiguous",
                candidates=[self._to_identity(c) for c in contacts],
                address=address,
                channel_type=channel_type,
            )
        else:
            return IdentityResult(
                status="unknown",
                address=address,
                channel_type=channel_type,
            )

# 2. Integrator registers the resolver
fw = RoomKit(
    store=PostgresStore(url="..."),
    identity_resolver=CRMIdentityResolver(crm_client),
)

# 3. Integrator handles ambiguity (one hook, any strategy)
@fw.hook(trigger="on_identity_ambiguous", execution="sync")
async def handle_ambiguous(
    event: InboundResult,
    resolution: IdentityResult,
) -> IdentityHookResult:
    # Strategy: check for recent active room with any candidate
    for candidate in resolution.candidates:
        recent = await fw.store.find_latest_room(
            participant_id=candidate.id,
            status="active",
        )
        if recent:
            return IdentityHookResult.resolved(identity=candidate)

    # No recent context — let advisor decide
    return IdentityHookResult.pending(
        display_name=f"Non identifié ({resolution.address})",
        candidates=resolution.candidates,
    )

8. Integration Surfaces

8.1 REST API

For humans, dashboards, and external systems:

Rooms
  POST   /rooms                          Create room (with metadata)
  GET    /rooms/{id}                     Get room (includes channel info, metadata)
  PATCH  /rooms/{id}                     Update room (status, metadata)
  DELETE /rooms/{id}                     Close room
  GET    /rooms?org_id=X&status=active   List rooms (filterable by metadata)

Channels (dynamic, with permissions + metadata)
  POST   /rooms/{id}/channels            Attach channel (access, visibility, metadata)
  DELETE /rooms/{id}/channels/{cid}      Detach channel
  PATCH  /rooms/{id}/channels/{cid}      Update access, visibility, muted, metadata
  POST   /rooms/{id}/channels/{cid}/mute     Mute channel
  POST   /rooms/{id}/channels/{cid}/unmute   Unmute channel
  GET    /rooms/{id}/channels            List channels (permissions + info + direction + media_types)

Registered Channels (framework-level)
  GET    /channels                       List registered channels (type, direction, media_types, info)
  GET    /channels/{id}                  Get channel details (capabilities, info, status)

Events & Timeline
  POST   /rooms/{id}/events              Send event into room
  GET    /rooms/{id}/timeline            Get timeline (paginated)
  GET    /rooms/{id}/timeline?visibility=all  Filter by visibility

Participants
  POST   /rooms/{id}/participants        Add participant
  DELETE /rooms/{id}/participants/{pid}  Remove participant
  GET    /rooms/{id}/participants        List participants (includes identification status)
  POST   /rooms/{id}/participants/{pid}/resolve   Resolve pending participant
         body: { "identity_id": "..." }            (advisor picks from candidates)
  GET    /rooms/{id}/participants?identification=pending   List unresolved participants

Tasks & Observations
  GET    /rooms/{id}/tasks               Room tasks
  GET    /tasks?org_id=X&status=pending  All pending tasks
  PATCH  /tasks/{id}                     Update task
  GET    /rooms/{id}/observations        Room observations

Hooks
  POST   /rooms/{id}/hooks               Add room-level hook
  DELETE /rooms/{id}/hooks/{hid}         Remove hook
  GET    /rooms/{id}/hooks               List hooks

Identity
  POST   /identities                     Create
  GET    /identities/resolve             Resolve by channel+address
  PATCH  /identities/{id}               Link addresses

Webhooks (inbound from providers)
  POST   /webhooks/{channel_type}/{provider}         Inbound message
  POST   /webhooks/{channel_type}/{provider}/status   Delivery status

Real-time (WebSocket)
  WS     /ws/{room_id}                   WebSocket connection for a room participant
  GET    /rooms/{id}/connections          List active connections in room

8.2 MCP Server

For AI agents to interact with Rooms natively:

MCP Tools (actions):

send_message(room_id, text)                    Send a message into a Room
create_task(room_id, type, action, data)       Create a task
add_observation(room_id, type, data)           Log an observation
attach_channel(room_id, channel_config)        Add a channel
detach_channel(room_id, channel_id)            Remove a channel
mute_channel(room_id, channel_id)              Mute a channel
unmute_channel(room_id, channel_id)            Unmute a channel
set_channel_visibility(room_id, cid, vis)      Change visibility
update_room_metadata(room_id, key, value)      Enrich Room data
resolve_identity(channel, address)             Look up identity
escalate_to_human(room_id, reason)             Request human handoff

MCP Resources (data):

room://{room_id}                               Room details
room://{room_id}/timeline                      Conversation history
room://{room_id}/participants                  Who is in the room
room://{room_id}/channels                      Attached channels + permissions
room://{room_id}/tasks                         Tasks for this room
identity://{identity_id}                       Identity details

8.3 Both Surfaces, Same Core

# REST and MCP both call the same RoomKit core:

# REST:
@router.post("/rooms/{room_id}/channels/{cid}/mute")
async def mute_channel(room_id: str, cid: str):
    return await fw.mute(room_id, cid)

# MCP:
@mcp.tool()
async def mute_channel(room_id: str, channel_id: str):
    return await fw.mute(room_id, channel_id)

# Same fw.mute() underneath — same hooks, same timeline event, same store.

8.4 Inbound Room Routing

When an inbound event arrives (e.g., SMS webhook), the framework must decide which Room to route it to. This is process_inbound's responsibility:

Inbound arrives (channel_id, raw_payload)
      |
      v
Channel.handle_inbound(raw_payload) -> InboundResult
      |
      v
IdentityResolver.resolve(channel_type, address) -> IdentityResult
      |
      v
Room Routing Strategy (pluggable):
      |
      +-- Default: find latest active Room where this participant
      |   is connected via the same channel type.
      |   If found -> route to existing Room.
      |   If not found -> create new Room, fire on_room_created hook.
      |
      +-- Custom: integrator provides a RoomRouter implementation:
          async def route(participant, channel_type, channel_data) -> Room | None
          Returns existing Room or None (framework creates new Room).

RoomRouter ABC:

class RoomRouter(ABC):
    """Pluggable room routing -- integrator decides which room an inbound goes to."""

    @abstractmethod
    async def route(
        self,
        participant: Participant,
        channel_type: ChannelType,
        channel_data: dict[str, Any],
    ) -> Room | None:
        """Return an existing Room to route to, or None to create a new Room.

        Examples:
        - Find latest active room for this participant (default behavior)
        - Match by external ticket ID in channel_data
        - Always create a new room (stateless mode)
        """
        ...

The default implementation (DefaultRoomRouter) finds the latest active Room where the participant is connected via the same channel type. Integrators can override this for ticket-based routing, round-robin assignment, etc.

on_room_created hook behavior:

The on_room_created hook fires after the Room is persisted but before the inbound event is processed. This means: - The Room exists in the store and has an ID. - The hook can safely call fw.attach_channel() — no re-entrancy issue because the room lock is not held during hook execution. - The inbound event is queued and processed after the hook completes. - Use execution="async" for on_room_created hooks to avoid blocking the inbound pipeline.


9. Event Flow — Complete Scenario

Customer (SMS) + AI → Advisor joins → AI becomes assistant → AI unmuted

STEP 1: Room created with SMS + AI
─────────────────────────────────

Customer sends SMS: "Bonjour, j'ai besoin d'aide avec mon compte"

  → Twilio webhook → POST /webhooks/sms/twilio
  → SMS Channel parses, resolves identity
  → Framework: no open Room → create_room()
  → on_room_created hook (integrator logic):
      attach AI channel: access=read_write, visibility=all, muted=false
  → RoomEvent(type=message) added to timeline

  Room channels:
  ┌──────────┬────────────┬─────────┬────────────┐
  │ Channel  │ Access     │ Muted   │ Visibility │
  ├──────────┼────────────┼─────────┼────────────┤
  │ SMS      │ read_write │ false   │ all        │
  │ AI       │ read_write │ false   │ all        │
  └──────────┴────────────┴─────────┴────────────┘

  → Event Router broadcasts to AI (read access ✓)
  → AI responds: "Bonjour! Comment puis-je vous aider?"
  → AI has write access ✓, not muted ✓, visibility=all
  → SMS Channel delivers to customer ✓

STEP 2: Advisor joins — business logic mutes AI
────────────────────────────────────────────────

Advisor opens dashboard → WebSocket to room

  → on_channel_attached hook (integrator logic):
      if new_channel is agent transport:
          fw.mute(room_id, ai_channel_id)                              # AI can't talk
          fw.set_visibility(room_id, ai_channel_id, channels(["http"])) # when unmuted, only agents see

  Room channels:
  ┌──────────┬────────────┬─────────┬────────────────┐
  │ Channel  │ Access     │ Muted   │ Visibility     │
  ├──────────┼────────────┼─────────┼────────────────┤
  │ SMS      │ read_write │ false   │ all            │
  │ AI       │ read_write │ true    │ channels(http) │  ← muted + visibility changed
  │ HTTP     │ read_write │ false   │ all            │  ← new
  └──────────┴────────────┴─────────┴────────────────┘

  Customer SMS: "Je ne peux plus me connecter"
  → Event Router:
    → SMS: source, skip
    → AI: has read access ✓ → receives event → produces ChannelOutput:
        events: ["Il a eu 3 tentatives échouées. Suggérer reset mot de passe."]
        BUT AI is muted → events are BLOCKED by Event Router
        tasks: [Task(type="insight", data="3 failed logins")]
        observations: [Observation(type="intent", value="login_issue")]
        → Side effects ARE stored (not subject to mute)
    → HTTP: pushes customer message to advisor's browser ✓

  Advisor sees:
    - Customer: "Je ne peux plus me connecter"
    - [Task from AI: 3 failed login attempts detected]  (via REST API / dashboard)

STEP 3: Advisor unmutes AI as assistant
───────────────────────────────────────

Advisor clicks "Enable AI suggestions" in dashboard

  → API: POST /rooms/{id}/channels/{ai_id}/unmute
  → AI is no longer muted, but visibility = channels(["http"])

  Room channels:
  ┌──────────┬────────────┬─────────┬────────────────┐
  │ Channel  │ Access     │ Muted   │ Visibility     │
  ├──────────┼────────────┼─────────┼────────────────┤
  │ SMS      │ read_write │ false   │ all            │
  │ AI       │ read_write │ false   │ channels(http) │  ← unmuted, but only agents see
  │ HTTP     │ read_write │ false   │ all            │
  └──────────┴────────────┴─────────┴────────────────┘

  Customer SMS: "Est-ce que je peux changer mon mot de passe?"
  → AI responds: "Le client peut reset via Paramètres > Sécurité > Réinitialiser"
  → AI write access ✓, not muted ✓, visibility=channels(["http"])
  → HTTP Channel delivers AI suggestion to advisor ✓
  → SMS Channel does NOT receive it (not in visibility list) ✓

  Advisor sees: [AI suggestion] "Le client peut reset via Paramètres > Sécurité..."
  Customer sees: nothing from AI

STEP 4: Advisor responds manually
──────────────────────────────────

Advisor types: "Allez dans Paramètres, puis Sécurité, puis Réinitialiser le mot de passe."
  → HTTP source, visibility=all
  → SMS delivers to customer ✓
  → AI receives (read access), produces observation: intent=password_reset ✓

STEP 5: Advisor lets AI respond directly
────────────────────────────────────────

Advisor clicks "Let AI respond to customer" in dashboard

  → API: PATCH /rooms/{id}/channels/{ai_id}  body: {visibility: "all"}

  Room channels:
  ┌──────────┬────────────┬─────────┬────────────┐
  │ Channel  │ Access     │ Muted   │ Visibility │
  ├──────────┼────────────┼─────────┼────────────┤
  │ SMS      │ read_write │ false   │ all        │
  │ AI       │ read_write │ false   │ all        │  ← now customer sees AI again
  │ HTTP     │ read_write │ false   │ all        │
  └──────────┴────────────┴─────────┴────────────┘

  AI can now respond directly to customer via SMS.

STEP 6: Advisor leaves
──────────────────────

  → on_channel_detached hook: no special logic (AI already has visibility=all)
  → Room continues with SMS + AI, as in Step 1.

Timeline:
┌────┬──────────┬─────────────────────────────────────────────┬───────────┬──────────┐
│ #  │ Source   │ Content                                     │ Type      │ Visible  │
├────┼──────────┼─────────────────────────────────────────────┼───────────┼──────────┤
│ 1  │ system   │ Room created                                │ system    │ internal │
│ 2  │ system   │ AI channel attached (rw, all)               │ ch_attach │ internal │
│ 3  │ SMS      │ "Bonjour, j'ai besoin d'aide"              │ message   │ all      │
│ 4  │ AI       │ "Bonjour! Comment puis-je vous aider?"     │ message   │ all      │
│ 5  │ system   │ HTTP channel attached (rw, all)             │ ch_attach │ internal │
│ 6  │ system   │ AI channel muted                            │ ch_muted  │ internal │
│ 7  │ system   │ AI visibility → channels(http)              │ ch_update │ internal │
│ 8  │ SMS      │ "Je ne peux plus me connecter"             │ message   │ all      │
│ 9  │ AI       │ Task: 3 failed logins detected              │ task      │ internal │
│ 10 │ system   │ AI channel unmuted                          │ ch_unmute │ internal │
│ 11 │ SMS      │ "Est-ce que je peux changer mon mdp?"      │ message   │ all      │
│ 12 │ AI       │ "Reset via Paramètres > Sécurité"          │ message   │ http     │
│ 13 │ HTTP     │ "Allez dans Paramètres, puis Sécurité..."  │ message   │ all      │
│ 14 │ system   │ AI visibility → all                         │ ch_update │ internal │
│ 15 │ system   │ HTTP channel detached                       │ ch_detach │ internal │
└────┴──────────┴─────────────────────────────────────────────┴───────────┴──────────┘

Sensitivity Scanning — Block SIN + Targeted Responses (Use Case 6)

A customer on SMS sends their Social Insurance Number to a financial advisor on WebSocket. A sync hook intercepts, blocks the message, and injects targeted responses to both parties.

SETUP: Room with SMS (customer) + WebSocket (advisor)
──────────────────────────────────────────────────────

Room: "advisor-session-42"
┌──────────┬────────────┬─────────┬────────────┐
│ Channel  │ Access     │ Muted   │ Visibility │
├──────────┼────────────┼─────────┼────────────┤
│ SMS      │ read_write │ false   │ all        │  ← customer
│ WS       │ read_write │ false   │ all        │  ← financial advisor
└──────────┴────────────┴─────────┴────────────┘

Global sync hook registered: sensitivity_scanner (priority=0)


STEP 1: Customer sends SIN via SMS
───────────────────────────────────

Customer SMS: "Bonjour, mon NAS est 123-456-789"

  → Twilio webhook → POST /webhooks/sms/twilio
  → SMSChannel.handle_inbound() → InboundResult(event)
  → Event enters pipeline, gets index assigned (index=5)

  → Sync Hook Pipeline:
    → [0] sensitivity_scanner:
        - Inspects event.content.text
        - Regex matches SIN pattern: \b\d{3}[-\s]?\d{3}[-\s]?\d{3}\b ✓
        - Returns HookResult.block(...)


STEP 2: Framework processes the block
──────────────────────────────────────

  1. STORE original event with status=BLOCKED:
     RoomEvent(
         index=5,
         type=message,
         content="Bonjour, mon NAS est 123-456-789",
         source_channel="sms",
         status=BLOCKED,                          ← marked as blocked
         blocked_by="sensitivity_scanner",         ← which hook blocked it
     )
     → Stored in timeline for audit trail
     → NOT delivered to any channel

  2. DELIVER injected events:

     InjectedEvent #1 → target_channel="sms" (customer):
       RoomEvent(
           index=6,
           type=message,
           content="Votre message a été bloqué. Les numéros d'assurance
                    sociale ne peuvent pas être envoyés par SMS pour des
                    raisons de sécurité.",
           source_channel="system",
           status=DELIVERED,
       )
       → SMSChannel.deliver() → Twilio sends SMS to customer ✓

     InjectedEvent #2 → target_channels=[all except "sms"] (advisor):
       RoomEvent(
           index=7,
           type=message,
           content="Le client a tenté d'envoyer des données sensibles (NAS).
                    Le message a été bloqué pour des raisons de sécurité.",
           source_channel="system",
           status=DELIVERED,
       )
       → WebSocketChannel.deliver() → pushes to advisor's browser ✓

  3. PROCESS side effects:
     Observation(
         type="sensitivity_violation",
         data={"pattern": "SIN", "channel": "sms", "room_id": "advisor-session-42"}
     )
     → Stored + dispatched to on_observation hooks (compliance logging)


STEP 3: What each party sees
─────────────────────────────

  Customer (SMS):
    ✗ Their original message was NOT echoed back (standard SMS behavior)
    ✓ Receives: "Votre message a été bloqué. Les numéros d'assurance sociale
                 ne peuvent pas être envoyés par SMS..."

  Advisor (WebSocket):
    ✗ Does NOT see "Bonjour, mon NAS est 123-456-789" (blocked)
    ✓ Sees: "Le client a tenté d'envoyer des données sensibles (NAS)..."

  Compliance team (via API / async hooks):
    ✓ Can query blocked events: GET /api/v1/rooms/{id}/events?status=blocked
    ✓ Observation logged: sensitivity_violation with full context


Timeline:
┌────┬──────────┬──────────────────────────────────────────────┬──────────┬──────────┬─────────┐
│ #  │ Source   │ Content                                      │ Type     │ Status   │ Visible │
├────┼──────────┼──────────────────────────────────────────────┼──────────┼──────────┼─────────┤
│ 1  │ system   │ Room created                                 │ system   │ delivered│ internal│
│ 2  │ system   │ SMS channel attached (rw, all)               │ ch_attach│ delivered│ internal│
│ 3  │ system   │ WS channel attached (rw, all)                │ ch_attach│ delivered│ internal│
│ 4  │ SMS      │ "Bonjour, j'ai une question..."             │ message  │ delivered│ all     │
│ 5  │ SMS      │ "Bonjour, mon NAS est 123-456-789"          │ message  │ BLOCKED  │ —       │
│ 6  │ system   │ "Votre message a été bloqué..."             │ message  │ delivered│ sms     │
│ 7  │ system   │ "Le client a tenté d'envoyer..."            │ message  │ delivered│ ws      │
│    │          │ Observation: sensitivity_violation            │ observ.  │ —        │ internal│
└────┴──────────┴──────────────────────────────────────────────┴──────────┴──────────┴─────────┘

Key framework behaviors demonstrated: - Sync hook blocks before broadcast — the advisor never sees the SIN - InjectedEvent with target_channel — different messages to different channels - Blocked event stored with audit trailstatus=BLOCKED, blocked_by=hook_name - Observations as side effects — compliance logging independent of event delivery - The scanner can be regex, AI, or external service — the framework doesn't care what's inside the hook

Ambiguous Identity — Shared Family Phone Number (Use Case 7)

A customer sends an SMS from a phone number shared by 3 family members. The CRM returns 3 contacts. The framework creates a pending participant and the advisor resolves manually.

STEP 1: SMS arrives, identity resolver runs
────────────────────────────────────────────

SMS: "Bonjour, j'aimerais modifier mon placement"
from: +15551234567

  → Twilio webhook → POST /webhooks/sms/twilio
  → SMSChannel.handle_inbound() → InboundResult
  → Framework calls IdentityResolver.resolve(SMS, "+15551234567")

  → Integrator's CRM resolver:
      SELECT * FROM contacts WHERE phone = '+15551234567'
      → 3 results: Marie Tremblay, Jean Tremblay, Sophie Tremblay

  → Returns: IdentityResult(
        status="ambiguous",
        candidates=[Marie, Jean, Sophie],
        address="+15551234567",
        channel_type=SMS,
    )


STEP 2: Framework fires on_identity_ambiguous hook
───────────────────────────────────────────────────

  → Integrator's hook runs:
    - Checks for recent active room with any candidate → none found
    - Returns: IdentityHookResult.pending(
          display_name="Non identifié (+15551234567)",
          candidates=[Marie, Jean, Sophie],
      )

  → Framework creates participant with:
      identification=pending
      candidates=[Marie, Jean, Sophie]
      display_name="Non identifié (+15551234567)"


STEP 3: Room created with pending participant
─────────────────────────────────────────────

  Room: "session-99"
  ┌──────────┬────────────┬───────────────────────────────────────────┐
  │ Channel  │ Access     │ Participant                               │
  ├──────────┼────────────┼───────────────────────────────────────────┤
  │ SMS      │ read_write │ PENDING — candidates: Marie, Jean, Sophie│
  │ WS       │ read_write │ Advisor Dupont (identified)              │
  └──────────┴────────────┴───────────────────────────────────────────┘

  → Message "Bonjour, j'aimerais modifier mon placement" enters room
  → Event stored with participant_id=pending_participant_id
  → Event delivered normally to advisor via WebSocket

  Advisor's dashboard shows:
  ┌─────────────────────────────────────────────────────────┐
  │ ⚠ Participant non identifié (+15551234567)              │
  │                                                         │
  │ Message: "Bonjour, j'aimerais modifier mon placement"  │
  │                                                         │
  │ Qui est-ce?                                             │
  │ ○ Marie Tremblay (dernière conversation: 2 jan.)       │
  │ ○ Jean Tremblay (dernière conversation: 15 déc.)       │
  │ ○ Sophie Tremblay (aucune conversation)                │
  │                                                         │
  │ [Confirmer]                                             │
  └─────────────────────────────────────────────────────────┘


STEP 4: Advisor resolves identity
─────────────────────────────────

  Advisor selects "Marie Tremblay":
  → API: POST /api/v1/rooms/session-99/participants/{pending_id}/resolve
         body: { "identity_id": "marie-tremblay-uuid" }

  → Framework:
    1. Updates participant:
         identification=identified
         identity=Marie Tremblay
         resolved_at=now
         resolved_by="advisor"
         candidates=None (cleared)

    2. Updates ChannelBinding:
         participant_id → marie-tremblay-uuid

    3. Emits event: participant_identified
         RoomEvent(
             type=participant_identified,
             content=SystemContent(
                 code="participant_identified",
                 message="Participant identifié: Marie Tremblay",
                 data={"identity_id": "marie-tremblay-uuid", "resolved_by": "advisor"}
             ),
         )

    4. Fires hook: on_participant_identified
         → Integrator can load conversation history, CRM context, etc.


STEP 5: Conversation continues normally
────────────────────────────────────────

  Room now has identified participant:
  ┌──────────┬────────────┬───────────────────────────────┐
  │ Channel  │ Access     │ Participant                   │
  ├──────────┼────────────┼───────────────────────────────┤
  │ SMS      │ read_write │ Marie Tremblay (identified)  │
  │ WS       │ read_write │ Advisor Dupont (identified)  │
  └──────────┴────────────┴───────────────────────────────┘

  Advisor: "Bonjour Marie! Quel placement souhaitez-vous modifier?"
  → Normal conversation flow, identity resolved.


Timeline:
┌────┬──────────┬──────────────────────────────────────────────┬───────────────┬──────────┐
│ #  │ Source   │ Content                                      │ Type          │ Status   │
├────┼──────────┼──────────────────────────────────────────────┼───────────────┼──────────┤
│ 1  │ system   │ Room created                                 │ system        │ delivered│
│ 2  │ system   │ SMS attached (participant=PENDING,           │ ch_attach     │ delivered│
│    │          │  candidates=[Marie, Jean, Sophie])           │               │          │
│ 3  │ system   │ WS attached (participant=Advisor)            │ ch_attach     │ delivered│
│ 4  │ SMS      │ "Bonjour, j'aimerais modifier mon placement"│ message       │ delivered│
│    │          │  participant_id=PENDING                      │               │          │
│ 5  │ system   │ Participant resolved → Marie Tremblay        │ participant_  │ delivered│
│    │          │  resolved_by=advisor                         │ identified    │          │
│ 6  │ WS       │ "Bonjour Marie! Quel placement..."          │ message       │ delivered│
└────┴──────────┴──────────────────────────────────────────────┴───────────────┴──────────┘

Key framework behaviors demonstrated: - IdentityResolver returns ambiguous result — framework doesn't guess, it asks - on_identity_ambiguous hook — integrator picks the strategy (auto, pending, challenge) - Pending participant — conversation flows normally while identity is unresolved - Manual resolution via REST API — advisor sees candidates and picks one - participant_identified event — clean audit trail in the timeline - on_participant_identified hook — integrator can react (load CRM context, etc.) - No spaghetti — resolver + one hook + API endpoint. Same pattern as everything else.

AI ↔ AI — Multi-Agent Collaboration (Use Case 8)

Two AI agents collaborate in a Room: an analyst reads documents and produces findings, a writer generates a client-ready report from those findings. An advisor supervises via WebSocket.

STEP 1: Room setup — analyst + writer + advisor
────────────────────────────────────────────────

  Integrator creates room and attaches 3 channels:

  Room: "report-42"
  ┌──────────┬────────────┬─────────────────────────┬─────────────┐
  │ Channel  │ Access     │ Visibility              │ Pattern     │
  ├──────────┼────────────┼─────────────────────────┼─────────────┤
  │ analyst  │ read_write │ all                     │ Direct      │
  │ writer   │ read_write │ all                     │ Direct      │
  │ ws_adv   │ read_write │ all                     │ Direct      │
  └──────────┴────────────┴─────────────────────────┴─────────────┘


STEP 2: Advisor triggers the chain
───────────────────────────────────

  Advisor sends: "Analyse le dossier client #1234 et produis un rapport"
  → RoomEvent(source=ws_adv, chain_depth=0)

  → Event Router broadcasts to all channels with read access:
    → analyst.on_event() receives the message (chain_depth=0)
    → writer.on_event() receives the message (chain_depth=0)
      → writer sees it's not from analyst, returns empty ChannelOutput


STEP 3: Analyst responds
─────────────────────────

  analyst.on_event() produces:
    ChannelOutput(
        events=[RoomEvent(
            content=TextContent(text="Analyse du dossier #1234:\n"
                "1. Portefeuille équilibré — 60% actions, 40% obligations\n"
                "2. Rendement YTD: +8.2%\n"
                "3. Risque: Concentration dans le secteur technologique"),
        )],
        observations=[Observation(type="analysis_complete", data={...})],
    )

  → Framework stores event with chain_depth=1 (parent was 0)
  → Event Router broadcasts to all channels:
    → writer.on_event() receives analyst's findings (chain_depth=1)
    → ws_adv delivers findings to advisor's browser


STEP 4: Writer responds to analyst
───────────────────────────────────

  writer.on_event() produces:
    ChannelOutput(
        events=[RoomEvent(
            content=TextContent(text="Rapport pour le client:\n\n"
                "Cher client,\n\n"
                "Votre portefeuille affiche une performance de +8.2%...\n"
                "Recommandation: Rééquilibrer le secteur technologique..."),
        )],
        tasks=[Task(type="review", action="advisor_approval", data={...})],
    )

  → Framework stores event with chain_depth=2 (parent was 1)
  → Event Router broadcasts:
    → analyst.on_event() receives writer's report (chain_depth=2)
    → ws_adv delivers report to advisor's browser
  → Task "advisor_approval" stored as side effect


STEP 5: Chain continues (analyst refines)
──────────────────────────────────────────

  analyst.on_event() reviews writer's report:
    → "J'ajoute: le client a exprimé une tolérance au risque modérée"
    → chain_depth=3

  writer.on_event() updates report:
    → "Rapport mis à jour avec la mention du profil de risque"
    → chain_depth=4

  ... chain continues as needed ...


STEP 6: Chain depth limit (safety)
──────────────────────────────────

  If the back-and-forth continues beyond max_event_chain_depth (default 10):

  → analyst produces response at chain_depth=10
  → Framework blocks it:
      status=BLOCKED
      blocked_by="event_chain_depth_limit"
  → Observation emitted: type="event_chain_depth_exceeded"
  → Side effects still flow (tasks, observations from the blocked response)
  → Advisor sees the observation and can react (mute one AI, or trigger summary)


Timeline:
┌────┬──────────┬──────────────────────────────────────────┬───────┬──────────┐
│ #  │ Source   │ Content (truncated)                      │ Depth │ Status   │
├────┼──────────┼──────────────────────────────────────────┼───────┼──────────┤
│ 1  │ ws_adv   │ "Analyse le dossier #1234..."           │ 0     │ delivered│
│ 2  │ analyst  │ "Analyse: 1. Portefeuille équilibré..." │ 1     │ delivered│
│ 3  │ writer   │ "Rapport: Cher client..."               │ 2     │ delivered│
│ 4  │ analyst  │ "J'ajoute: tolérance au risque..."      │ 3     │ delivered│
│ 5  │ writer   │ "Rapport mis à jour..."                 │ 4     │ delivered│
│ ...│ ...      │ ...                                      │ ...   │ ...      │
│ 11 │ analyst  │ (blocked — depth limit reached)          │ 10    │ blocked  │
└────┴──────────┴──────────────────────────────────────────┴───────┴──────────┘

Key framework behaviors demonstrated: - AI ↔ AI uses the same primitives — no special multi-agent API. Two AIChannels in a Room, both read_write, both visibility=all. - chain_depth tracking — every re-entry increments depth. Human origin = 0, analyst response = 1, writer response = 2, etc. - chain_depth limit — prevents unbounded recursion. Uses existing blocked + blocked_by mechanism. - Side effects always flow — even when chain is stopped, tasks and observations from the blocked event are stored. - Human supervision — advisor sees everything in real-time, can mute an AI mid-chain, or trigger a summary. - Dynamic control — integrator can mute the writer until the analyst emits an analysis_complete observation, then unmute. Same primitives as Use Case 5.


10. AI Channel Implementation

The AI Channel wraps an AIProvider. It doesn't know about permissions — it just produces output. The framework enforces the rules. The provider handles the actual AI interaction (pydantic-ai, Anthropic SDK, OpenAI SDK, or any custom implementation).

class AIChannel(Channel):
    """AI channel — wraps any AIProvider."""

    type = ChannelType.AI
    category = ChannelCategory.INTELLIGENCE
    media_types = [ChannelMediaType.TEXT]

    def __init__(self, provider: AIProvider, name: str = "ai"):
        self.provider = provider
        self.name = name

    @property
    def info(self) -> dict[str, Any]:
        return {"provider": self.provider.name, "model": self.provider.model_name}

    async def on_event(self, event: RoomEvent, room: RoomContext) -> ChannelOutput:
        """Process event. Framework handles permissions — we just produce output."""

        # Skip own events (prevent loops)
        if event.source_channel == self.id:
            return ChannelOutput.empty()

        # Only process messages
        if event.type != EventType.MESSAGE:
            return ChannelOutput.empty()

        # Build context: timeline + target channel capabilities + media types
        history = self._build_messages(room.timeline)
        target_capabilities = room.get_target_capabilities(event.participant_id)
        target_media_types = room.get_target_media_types(event.participant_id)

        context = AIContext(
            room=room,
            target_capabilities=target_capabilities,
            target_media_types=target_media_types,
            system_instructions=self._get_instructions(room),
            metadata=room.metadata,
        )

        # Call the provider — could be pydantic-ai, Anthropic SDK, OpenAI, anything
        result = await self.provider.generate(messages=history, context=context)

        return ChannelOutput(
            events=[RoomEvent(
                content=TextContent(text=result.text),
                source=EventSource(
                    channel_type=ChannelType.AI,
                    provider=self.provider.name,
                    channel_data=AIChannelData(**result.provider_metadata),
                ),
            )],
            tasks=result.tasks,
            observations=result.observations,
        )

        # NOTE: The framework decides if these events actually get broadcast
        # based on the channel's access, muted status, and write_visibility.
        # This channel doesn't need to know about any of that.

Usage — same AIChannel, different providers:

from pydantic_ai import Agent
from roomkit.providers.ai import PydanticAIProvider, AnthropicDirectProvider, MockAIProvider

# Option 1: pydantic-ai (recommended — supports Anthropic, OpenAI, Gemini, etc.)
agent = Agent(model="anthropic:claude-sonnet-4-5", instructions="You help customers...")
fw.register_channel(AIChannel(provider=PydanticAIProvider(agent), name="support"))

# Option 2: Anthropic SDK directly
from anthropic import AsyncAnthropic
client = AsyncAnthropic(api_key="sk-...")
fw.register_channel(AIChannel(provider=AnthropicDirectProvider(client, model="claude-sonnet-4-5"), name="support"))

# Option 3: Mock for testing
fw.register_channel(AIChannel(provider=MockAIProvider(responses=["Hello!"]), name="test_ai"))

# Option 4: Custom provider (integrator writes their own AIProvider)
fw.register_channel(AIChannel(provider=MyCustomProvider(...), name="custom_ai"))

The channel is dumb about permissions. It just produces output. The Event Router enforces the rules. This means the same AI channel can behave differently in different rooms depending on how the integrator configures its binding. And the same AIChannel class works with any AI provider.


11. Conversation Store

11.1 Interface

ConversationStore (ABC)
├── Rooms
│   ├── create_room(org_id, metadata) -> Room
│   ├── get_room(room_id) -> Room | None
│   ├── find_rooms(org_id, filters) -> list[Room]
│   ├── update_room(room_id, updates) -> Room
│   └── close_room(room_id) -> Room
├── Events
│   ├── add_event(room_id, event) -> RoomEvent
│   ├── get_timeline(room_id, since, limit, offset, visibility_filter) -> list[RoomEvent]
│   └── get_event(event_id) -> RoomEvent | None
├── Channels
│   ├── attach_channel(room_id, binding) -> ChannelBinding
│   ├── detach_channel(room_id, channel_id) -> None
│   ├── update_binding(room_id, channel_id, updates) -> ChannelBinding
│   └── get_bindings(room_id) -> list[ChannelBinding]
├── Participants
│   ├── add_participant(room_id, participant) -> Participant
│   ├── remove_participant(room_id, participant_id) -> None
│   └── get_participants(room_id) -> list[Participant]
├── Identity
│   ├── create_identity(org_id, data) -> Identity
│   ├── resolve_identity(channel, address, org_id) -> Identity | None
│   ├── link_address(identity_id, channel, address) -> None
│   └── get_identity(identity_id) -> Identity | None
├── Tasks
│   ├── create_task(room_id, task) -> Task
│   ├── get_tasks(room_id | org_id, filters) -> list[Task]
│   └── update_task(task_id, updates) -> Task
└── Observations
    ├── add_observation(room_id, observation) -> Observation
    └── get_observations(room_id, filters) -> list[Observation]

11.2 Implementations

Implementation Purpose
InMemoryStore Testing, prototyping
PostgresStore Production (SQLAlchemy async)
CustomStore Integrator wraps their own DB

12. Error Handling & Resilience

12.1 Send Results

SendResult
├── status: sent | queued | failed
├── provider_message_id: str | None
├── error: SendError | None (code, message, retryable)
└── retry_after: datetime | None

12.2 Retry

  • Exponential backoff with configurable max retries
  • Per-channel retry policies
  • Fallback channel option (SMS fails → try Email)
  • Dead letter queue for permanent failures

12.3 Rate Limiting

  • Declared per channel via ChannelCapabilities.rate_limit
  • Framework queues when rate-limited (no drops)
  • Per-organization limits supported

12.4 Circuit Breaker

  • Opens after N consecutive provider failures
  • Emits system RoomEvent
  • Periodic probe to detect recovery

12.5 AI Resilience

  • LLM timeout → configurable fallback (canned response or escalate)
  • Token limit → trigger conversation summarization
  • Model unavailable → circuit breaker on AI provider

12.6 Event Chain Depth Limit

When an event is broadcast to a channel with on_event() (e.g., an AI channel), the channel's response re-enters the pipeline as a new event. That new event may itself trigger another channel's on_event(), and so on. Without a bound, this creates unbounded recursion.

Mechanism:

  • RoomKit config: max_event_chain_depth: int = 10 (global, like sys.setrecursionlimit())
  • Tracking: The Event Router sets chain_depth=0 on human/inbound events. When a channel's on_event() produces a response that re-enters the pipeline, the response event's chain_depth is set to parent.chain_depth + 1.
  • Enforcement: When chain_depth >= max_event_chain_depth, the response event is blocked using the existing mechanism: status=blocked, blocked_by="event_chain_depth_limit".
  • Observability: An observation is emitted (type="event_chain_depth_exceeded") with the chain depth, source channel, and room ID.
  • Side effects still flow: Consistent with mute behavior — tasks and observations from the blocked event are still stored and dispatched.

Example: A room with two AI channels that react to each other's messages. After 10 levels of back-and-forth in a single chain, the 11th response is blocked. The integrator sees the observation and can react (e.g., alert, mute one channel).

What this does NOT do: - No per-room or per-channel depth config — it's a global safety limit - No new enum — uses existing EventStatus.blocked + blocked_by - No new hook trigger — integrator uses existing after_broadcast or observations to react

12.7 Observability

RoomKit uses Python's standard logging module. No third-party dependency. The integrator configures handlers, formatters, and log levels — the library just emits logs.

Named loggers:

Every module has its own logger. The integrator controls granularity.

roomkit                         # root logger
roomkit.core.framework          # RoomKit lifecycle (create_room, attach_channel, etc.)
roomkit.core.router             # EventRouter — broadcast decisions, permission checks, transcoding
roomkit.core.hooks              # HookEngine — hook execution, block/modify/allow decisions
roomkit.core.locks              # RoomLockManager — lock acquire/release, LRU eviction
roomkit.channels.sms            # SMSChannel — inbound parsing, delivery
roomkit.channels.email          # EmailChannel
roomkit.channels.websocket      # WebSocketChannel — connect/disconnect, delivery
roomkit.channels.ai             # AIChannel — context building, AI generation, response handling
roomkit.channels.whatsapp       # WhatsAppChannel
roomkit.providers.sms.twilio    # TwilioSMSProvider — HTTP calls, webhook parsing
roomkit.providers.sms.sinch     # SinchSMSProvider
roomkit.providers.email.sendgrid # SendGridProvider
roomkit.providers.ai.pydantic_ai # PydanticAIProvider — prompt building, token usage
roomkit.providers.ai.anthropic  # AnthropicDirectProvider
roomkit.providers.ai.openai     # OpenAIDirectProvider
roomkit.identity                # IdentityResolver — resolve, ambiguous, unknown decisions
roomkit.store                   # ConversationStore — save/get operations

Log levels — what gets logged where:

Level What Example
DEBUG Full pipeline trace, raw payloads, hook decisions, delivery details "EventRouter: delivering evt_abc to sms_customer (chain_depth=0)"
INFO Room created, event stored, channel attached/detached, participant resolved "Room room_8f3a created for org_acme"
WARNING Delivery failed (retryable), hook timeout, chain depth approaching limit, identity ambiguous "SMSChannel: delivery failed to +15551234567 (retryable=True, attempt=2/3)"
ERROR Provider error (non-retryable), circuit breaker opened, store write failure "TwilioSMSProvider: 401 Unauthorized — check credentials"

Structured log context:

Every log message includes structured context via logging.extra:

logger.info(
    "Event delivered",
    extra={
        "room_id": "room_8f3a",
        "event_id": "evt_abc",
        "channel_id": "sms_customer",
        "provider": "twilio",
        "chain_depth": 0,
        "status": "sent",
        "provider_message_id": "SM1234567890",
        "latency_ms": 245,
    },
)

The integrator's formatter decides how to render this — plain text, JSON, or whatever their log pipeline expects.

Delivery tracing — debugging "my email wasn't sent":

The full lifecycle of a delivery is logged:

DEBUG roomkit.core.router       EventRouter: broadcasting evt_abc to 3 channels
DEBUG roomkit.core.router       EventRouter: channel sms_customer — access=read_write, muted=False, visible=True → DELIVER
DEBUG roomkit.channels.sms      SMSChannel: delivering evt_abc to +15551234567
DEBUG roomkit.providers.sms.twilio TwilioSMSProvider: POST /Messages {to: +15551234567, body: "Bonjour..."}
DEBUG roomkit.providers.sms.twilio TwilioSMSProvider: 201 Created {sid: SM123, status: queued}
INFO  roomkit.core.router       Event evt_abc delivered to sms_customer (provider_message_id=SM123, latency_ms=245)

DEBUG roomkit.core.router       EventRouter: channel email_customer — access=read_write, muted=False, visible=True → DELIVER
DEBUG roomkit.channels.email    EmailChannel: delivering evt_abc to client@example.com
DEBUG roomkit.providers.email.sendgrid SendGridProvider: POST /mail/send {to: client@example.com}
WARNING roomkit.providers.email.sendgrid SendGridProvider: 429 Too Many Requests (retry_after=30s)
WARNING roomkit.channels.email  EmailChannel: delivery failed for evt_abc — retryable, queued for retry
ERROR roomkit.channels.email    EmailChannel: delivery failed for evt_abc after 3 attempts — SendGrid 429

DEBUG roomkit.core.router       EventRouter: channel ai_support — access=read, muted=False → ON_EVENT (intelligence)
DEBUG roomkit.channels.ai       AIChannel: building context for room_8f3a (5 messages, target=sms)
DEBUG roomkit.providers.ai.pydantic_ai PydanticAIProvider: generating (model=claude-sonnet-4-5, tokens_in=1250)
DEBUG roomkit.providers.ai.pydantic_ai PydanticAIProvider: complete (tokens_out=89, latency_ms=1340)
INFO  roomkit.core.router       AIChannel produced response — re-entering pipeline (chain_depth=1)

Framework events for monitoring:

In addition to logs, RoomKit emits structured framework events for programmatic observability:

@fw.on("channel_error")
async def on_error(event: FrameworkEvent):
    # event.data = {"channel_id": "email_customer", "error": "429 Too Many Requests",
    #               "room_id": "room_8f3a", "event_id": "evt_abc", "retryable": True}
    await alerting.send(f"Channel {event.data['channel_id']} error: {event.data['error']}")

@fw.on("delivery_failed")
async def on_delivery_failed(event: FrameworkEvent):
    # event.data = {"channel_id": "email_customer", "event_id": "evt_abc",
    #               "provider": "sendgrid", "attempts": 3, "last_error": "429"}
    await metrics.increment("delivery.failed", tags={"provider": event.data["provider"]})
Framework Event When Data
channel_error Any provider error (retryable or not) channel_id, error, room_id, retryable
delivery_failed All retry attempts exhausted channel_id, event_id, provider, attempts, last_error
delivery_succeeded Event successfully delivered channel_id, event_id, provider_message_id, latency_ms
hook_timeout A sync or async hook exceeded its timeout hook_name, trigger, timeout_ms, room_id
hook_error A hook raised an exception hook_name, trigger, error, room_id
circuit_breaker_opened Provider circuit breaker tripped provider, channel_type, failure_count
circuit_breaker_closed Provider recovered provider, channel_type
identity_ambiguous Identity resolution returned multiple candidates address, channel_type, candidate_count
chain_depth_exceeded Event blocked by chain depth limit room_id, channel_id, chain_depth

Integration service adds:

The library provides raw logs + framework events. The integration service adds the production observability stack:

Concern Library (roomkit) Integration Service
Logging logging with named loggers + structured extra JSON formatter, log aggregation (CloudWatch, Datadog, etc.)
Tracing Logs include room_id, event_id, channel_id for correlation OpenTelemetry spans, distributed tracing
Metrics Framework events (delivery_succeeded, delivery_failed, etc.) Prometheus counters/histograms, Grafana dashboards
Alerting Framework events with error details PagerDuty, Slack alerts via @fw.on() handlers
Error tracking Python exceptions with full context Sentry integration in hook/channel error handlers

Integrator setup — minimal:

import logging

# See everything during development
logging.getLogger("roomkit").setLevel(logging.DEBUG)

# Production: only warnings and errors
logging.getLogger("roomkit").setLevel(logging.WARNING)

# Fine-grained: debug one provider, silence the rest
logging.getLogger("roomkit").setLevel(logging.WARNING)
logging.getLogger("roomkit.providers.sms.twilio").setLevel(logging.DEBUG)

13. RoomKit API (Integrator's View)

13.1 Minimal: Human ↔ AI via SMS

from roomkit import RoomKit
from roomkit.channels.sms import SMSChannel
from roomkit.channels.ai import AIChannel
from roomkit.providers.sms import TwilioSMSProvider
from roomkit.providers.ai import PydanticAIProvider
from roomkit.store import InMemoryStore
from roomkit.identity.mock import MockIdentityResolver
from pydantic_ai import Agent

agent = Agent(model="anthropic:claude-sonnet-4-5", instructions="You help customers...")

fw = RoomKit(
    store=InMemoryStore(),
    identity_resolver=MockIdentityResolver(),
)
fw.register_channel(SMSChannel(provider=TwilioSMSProvider(sid="...", token="...", number="+155")))
fw.register_channel(AIChannel(provider=PydanticAIProvider(agent), name="support"))

# Default room setup: attach AI with full access
@fw.hook(trigger="on_room_created", execution="async")
async def setup_room(room: RoomContext):
    await fw.attach_channel(room.id, "support", access="read_write", visibility="all")

# --- Integration service (NOT part of the library) ---
# The integrator wraps fw.* calls in their own web framework:
from fastapi import FastAPI, Request, Response
app = FastAPI()

@app.post("/webhooks/sms/twilio")
async def twilio_webhook(request: Request):
    payload = await request.form()
    event = await fw.process_inbound("sms_twilio_1", dict(payload))
    return Response(status_code=200)

@app.post("/api/v1/rooms")
async def create_room(body: CreateRoomRequest):
    room = await fw.create_room(organization_id=body.organization_id, metadata=body.metadata)
    return room

13.2 Full: Human ↔ Human + AI + Hooks + Permissions

from roomkit import RoomKit
from roomkit.store import InMemoryStore  # or PostgresStore from integration service
from roomkit.identity import IdentityResolver

fw = RoomKit(
    store=InMemoryStore(),
    identity_resolver=CRMIdentityResolver(crm_client),
)

# Transport channels
fw.register_channel(SMSChannel(provider=TwilioSMSProvider(...)))
fw.register_channel(HTTPChannel())

# AI channels (provider-agnostic — swap PydanticAIProvider for any AIProvider)
fw.register_channel(AIChannel(provider=PydanticAIProvider(support_agent), name="support"))
fw.register_channel(AIChannel(provider=PydanticAIProvider(task_agent), name="task_creator"))

# --- Hooks: PII blocking (sync, blocks events) ---
@fw.hook(trigger="before_broadcast", execution="sync", priority=0)
async def pii_blocker(event: RoomEvent, room: RoomContext) -> HookResult:
    if event.type == EventType.MESSAGE and contains_pii(event.content.text):
        return HookResult.block("PII detected")
    return HookResult.allow()

# --- Hooks: audit logging (async) ---
@fw.hook(trigger="after_broadcast", execution="async")
async def audit_log(event: RoomEvent, room: RoomContext):
    await audit_service.log(event)

# --- Hooks: task dispatch (async) ---
@fw.hook(trigger="on_task_created", execution="async")
async def dispatch_task(task: Task, room: RoomContext):
    if task.type == "crm_update":
        await crm.update(task.data)
    elif task.type == "escalate":
        await fw.attach_channel(room.id, "http", access="read_write",
                                participant=on_call_agent)
        await fw.mute(room.id, "support")
        await fw.set_visibility(room.id, "support", Visibility.channels(["http"]))

# --- Hooks: room setup (async, fire-and-forget) ---
@fw.hook(trigger="on_room_created", execution="async")
async def setup_room(room: RoomContext):
    await fw.attach_channel(room.id, "support", access="read_write", visibility="all")
    await fw.attach_channel(room.id, "task_creator", access="read")  # read-only, side effects only

# --- Hooks: when human agent joins, reconfigure AI (async) ---
@fw.hook(trigger="on_channel_attached", execution="async")
async def on_agent_joins(event: ChannelEvent, room: RoomContext):
    if event.participant and event.participant.role == "agent":
        # Mute AI and restrict visibility to agents only
        for binding in room.channels:
            if binding.category == ChannelCategory.INTELLIGENCE:
                await fw.mute(room.id, binding.channel_id)
                await fw.set_visibility(room.id, binding.channel_id,
                                       Visibility.channels(["http"]))

# --- Integration service (NOT part of the library) ---
# The integrator wraps fw.* calls in their own web framework:
from fastapi import FastAPI
app = FastAPI()

@app.post("/webhooks/sms/twilio")
async def twilio_webhook(request: Request):
    payload = await request.form()
    await fw.process_inbound("sms_twilio_1", dict(payload))
    return Response(status_code=200)

# ... more routes wrapping fw.* calls
# MCP server is also mounted by the integration service, not the library.

14. RoomKit Primitives Summary

The framework provides primitives. Business logic is in the integrator's hooks and code.

┌──────────────────────────────────────┬──────────────────────────────────────┐
│          FRAMEWORK (primitives)       │      INTEGRATOR (business logic)     │
├──────────────────────────────────────┼──────────────────────────────────────┤
│ Channel access: read | write | rw    │ Which access level each channel gets │
│ Mute / unmute a channel              │ When to mute/unmute                  │
│ Visibility: all | channels | internal│ What visibility each channel gets    │
│ Attach / detach channels             │ Which channels to attach/detach      │
│ Hook pipeline (sync/async)           │ Hook handlers (PII, compliance, etc.)│
│ Hook block + inject targeted events  │ What to block, what to inject        │
│ Event status (delivered/blocked/fail)│ How to query and audit blocked events│
│ Event routing respects permissions   │ Room setup and channel configuration │
│ Two output paths (events vs effects) │ What tasks to create, how to act     │
│ Timeline stores everything (+ blocked)│ What to do with timeline data       │
│ REST API + MCP expose operations     │ Authentication, authorization        │
│ Provider abstraction                 │ Which provider to use                │
│ Identity resolution ABC              │ Resolution strategy                  │
│ Store ABC                            │ Store implementation choice          │
│ Event chain depth limit (configurable)│ Turn budgets, orchestration patterns│
└──────────────────────────────────────┴──────────────────────────────────────┘

What the framework does NOT decide: - When to mute an AI → integrator hook - When to make AI visible only to agents → integrator hook - What "observer mode" means → integrator configures access=read - What "assistant mode" means → integrator configures visibility=channels(["http"]) - When to escalate to human → integrator hook on task/observation - What routing to use → integrator hook on room_created


15. Scope & Phasing

Phase 0: Core Models & Primitives

  • Pydantic models: Room, RoomEvent, EventContent, ChannelBinding, Participant, Identity, Task, Observation
  • Permission system: Access, Muted, Visibility
  • Channel ABC (unified interface)
  • ChannelOutput with two paths (events + side effects)
  • ConversationStore ABC + InMemoryStore
  • Event Router (broadcast with permission enforcement)
  • Hook engine (sync + async pipeline)
  • Event chain depth tracking (library-level safety primitive)
  • Goal: Validate model against all 5 use cases with unit tests
  • Deliverable: Models + Store + Router + Hooks + Permissions. No real channels.

Phase 1: HTTP + SMS + Single AI Channel

  • HTTPChannel (WebSocket for browser users)
  • SMSChannel + TwilioSMSProvider
  • AIChannel (pydantic-ai)
  • Provider abstraction (SMSProvider ABC)
  • Channel capability injection into AI
  • Basic identity resolution
  • REST API: rooms, events, channels, timeline, permissions
  • Goal: Use Cases 1 and 2 end-to-end

Phase 2: Dynamic Permissions + Tasks

  • Full permission lifecycle: mute/unmute, set_access, set_visibility via API
  • Task and observation storage
  • Side effects always flow (even when muted)
  • on_task_created, on_channel_attached hooks
  • Multiple AI channels per Room with different permissions
  • Goal: Use Cases 3, 4, 5 (dynamic combinations with permissions)

Phase 3: MCP Server

  • MCP tools: send_message, create_task, mute/unmute, set_visibility
  • MCP resources: room, timeline, channels, tasks
  • AI agents interact with Rooms via MCP
  • Goal: AI-native Room interaction

Phase 4: Email + WhatsApp

  • EmailChannel + SendGridProvider / SMTPProvider
  • WhatsAppChannel + MetaProvider
  • Rich EventContent (buttons, cards, media)
  • Channel-specific ChannelData
  • Goal: Rich content across diverse channels

Phase 5: Resilience & Production

  • Retry + exponential backoff
  • Rate limiting
  • Circuit breaker
  • PostgresStore
  • Dead letter queue
  • Delivery status tracking
  • Structured logging, metrics hooks
  • Goal: Production-grade reliability

Phase 6: Extensibility

  • Plugin system for custom channels and providers
  • Custom event types
  • Advanced identity resolution
  • Conversation summarization
  • Multi-tenant channel configuration
  • Goal: Third parties can build on the framework

16. Open Questions

Resolved

# Question Decision Rationale
Q1 Event Broadcasting Order Concurrent (Option A) Technical requirements use asyncio.TaskGroup for concurrent broadcast. Simpler, avoids priority complexity. Read-only channels that need to complete before write channels can be handled via hooks.
Q2 Multiple Read-Write Channels Let both respond (Option A) Not an issue — the framework broadcasts the event. If the integrator doesn't want two responders, they mute one. The framework provides primitives (mute, visibility). Business logic decides when to use them.
Q3 REST API Framework Framework-agnostic (Option B) The library has no web framework dependency. The integration service uses FastAPI. fw.* methods are plain async — the integrator wraps them in any framework.
Q4 HTTP Channel for Browser Users Both (Option C) WebSocketChannel for real-time, HTTPChannel as REST polling fallback. Both are in the project structure.
Q5 Hook Registration Both (Option C) Code decorators for core logic, POST /rooms/{id}/hooks API for runtime dynamic rules.
Q6 Naming roomkit Clear, short, descriptive. The "Room" is the core abstraction.
Q7 Relation to TchatNSign Independent open source library roomkit is not a TchatNSign library. It is built independently for open source release with community. TchatNSign uses roomkit as a dependency in its integration service — no coupling in the other direction.

All questions resolved. No open questions remain.


17. Key Design Principles

  1. The Room is the truth — All state lives in the Room. Timeline records everything.
  2. Everything is a Channel — SMS, browser, AI, integration. Same interface.
  3. Primitives, not opinions — The framework provides access/mute/visibility. Business logic decides when to use them.
  4. Two output paths — Room events (subject to permissions) and side effects (always flow). Muting silences the voice, not the brain.
  5. Providers are swappable — Channel type ≠ provider. Twilio and Sinch both provide SMS. Anthropic and OpenAI both provide AI.
  6. Hooks intercept — Sync hooks block/modify. Async hooks observe/react. Pipeline architecture.
  7. Channels are dynamic — Attach, detach, mute, unmute, reconfigure at any time.
  8. Channel awareness at generation — AI knows target constraints and media types before generating.
  9. Three layers of channel data — Channel.info (instance), ChannelBinding.metadata (per-room), EventSource (per-event). Never lose data.
  10. REST + MCP + WebSocket — REST for systems, MCP for AI, WebSocket for real-time. All call the same core.
  11. Direction declares capability — Channels declare inbound/outbound/bidirectional. Permissions restrict per room.
  12. Two event levels — Room events (per-room, stored) and framework events (global, for subscribers).
  13. Media types are first-class — text, audio, video. Route to compatible channels. Ready for the future.
  14. Chain depth safety — Event chains (channel A responds → triggers channel B → triggers channel A → ...) are bounded by a configurable depth limit. Uses existing blocked + blocked_by mechanism — no new concepts.
  15. Start narrow — Phase 0 validates the model. Phase 1 proves it. Then expand.

Appendix A: Project Structure

The library is a pure async Python package. REST API, WebSocket endpoints, MCP server, PostgresStore, and Redis pub/sub belong to the integration service (see Technical Requirements §11).

roomkit/
├── src/
│   └── roomkit/
│       ├── __init__.py                    # Public API exports
│       ├── py.typed                       # PEP 561 marker (typed package)
│       │
│       ├── core/                          # Framework internals
│       │   ├── __init__.py
│       │   ├── framework.py               # RoomKit class — entry point
│       │   ├── router.py                  # EventRouter — broadcast + permissions
│       │   ├── hooks.py                   # HookEngine — sync + async pipeline
│       │   └── locks.py                   # RoomLockManager — per-room locking
│       │
│       ├── models/                        # Pydantic data models (no I/O)
│       │   ├── __init__.py
│       │   ├── room.py                    # Room, RoomStatus, RoomTimers
│       │   ├── event.py                   # RoomEvent, EventType, EventStatus, EventContent
│       │   ├── channel.py                 # ChannelBinding, Access, Visibility, ChannelOutput
│       │   ├── participant.py             # Participant, IdentificationStatus
│       │   ├── identity.py                # Identity, IdentityResult, IdentityHookResult
│       │   ├── hook.py                    # HookResult, InjectedEvent, HookTrigger
│       │   ├── task.py                    # Task, Observation
│       │   └── delivery.py               # DeliveryResult, InboundResult
│       │
│       ├── channels/                      # Channel ABC + built-in channels
│       │   ├── __init__.py
│       │   ├── base.py                    # Channel ABC, ChannelType, ChannelCapabilities
│       │   ├── sms.py                     # SMSChannel
│       │   ├── email.py                   # EmailChannel
│       │   ├── whatsapp.py                # WhatsAppChannel
│       │   └── ai.py                      # AIChannel
│       │
│       ├── providers/                     # Provider ABCs + implementations
│       │   ├── __init__.py
│       │   ├── sms/
│       │   │   ├── __init__.py
│       │   │   ├── base.py               # SMSProvider ABC
│       │   │   ├── twilio.py             # TwilioSMSProvider (extra: twilio or httpx)
│       │   │   ├── sinch.py              # SinchSMSProvider (extra: httpx)
│       │   │   └── mock.py               # MockSMSProvider (no deps)
│       │   ├── email/
│       │   │   ├── __init__.py
│       │   │   ├── base.py               # EmailProvider ABC
│       │   │   ├── sendgrid.py           # SendGridProvider (extra: httpx)
│       │   │   └── mock.py               # MockEmailProvider (no deps)
│       │   ├── ai/
│       │   │   ├── __init__.py
│       │   │   ├── base.py               # AIProvider ABC, AIContext, AIResponse
│       │   │   ├── pydantic_ai.py        # PydanticAIProvider (extra: pydantic-ai)
│       │   │   ├── anthropic.py          # AnthropicDirectProvider (extra: anthropic)
│       │   │   ├── openai.py             # OpenAIDirectProvider (extra: openai)
│       │   │   └── mock.py               # MockAIProvider (no deps)
│       │   └── whatsapp/
│       │       ├── __init__.py
│       │       ├── base.py               # WhatsAppProvider ABC
│       │       └── mock.py               # MockWhatsAppProvider (no deps)
│       │
│       ├── store/                         # Conversation store
│       │   ├── __init__.py
│       │   ├── base.py                   # ConversationStore ABC
│       │   └── memory.py                 # InMemoryStore (no deps — testing + dev)
│       │
│       ├── identity/                      # Identity resolution
│       │   ├── __init__.py
│       │   ├── base.py                   # IdentityResolver ABC
│       │   └── mock.py                   # MockIdentityResolver (no deps — testing)
│       │
│       └── config/                        # Provider configs (plain Pydantic models)
│           ├── __init__.py
│           └── providers.py              # TwilioConfig, SinchConfig, AnthropicConfig, etc.
└── tests/
    ├── conftest.py                        # Shared fixtures (InMemoryStore, MockProviders)
    ├── unit/
    │   ├── test_models.py                # Pydantic model validation
    │   ├── test_router.py                # Event routing + permissions
    │   ├── test_hooks.py                 # Hook pipeline (sync + async)
    │   ├── test_identity.py              # Identity resolution + ambiguity
    │   └── test_channels.py              # Channel ABC contract tests
    └── integration/
        ├── test_framework.py             # Full pipeline: inbound → hooks → broadcast
        ├── test_use_case_1.py            # Human <-> Human (cross-channel)
        ├── test_use_case_2.py            # Human <-> AI
        ├── test_use_case_3.py            # Human <-> Human + AI assistant
        ├── test_use_case_4.py            # Human <-> Human + Observer
        ├── test_use_case_5.py            # Dynamic channel management
        ├── test_use_case_6.py            # SIN sensitivity scanning
        └── test_use_case_7.py            # Ambiguous identity resolution

What is NOT in the library:

Missing from lib Why Where it lives
api/ directory REST API is an integration concern Integration service
ws.py WebSocket endpoint needs a web framework Integration service
mcp.py MCP server needs fastmcp Integration service
store/postgres.py Needs SQLAlchemy + asyncpg Integration service or separate package
store/migrations/ Needs Alembic Integration service
realtime.py Needs Redis pub/sub or similar Integration service
e2e tests Need FastAPI TestClient, HTTP server Integration service tests

Appendix B: Class Hierarchy & Architecture

Complete class hierarchy showing how all pieces compose:

┌─────────────────────────────────────────────────────────────────────────────┐
│                           FRAMEWORK (entry point)                            │
│                                                                             │
│  RoomKit                                                                    │
│  ├── store: ConversationStore          (ABC → InMemoryStore | PostgresStore)│
│  ├── channels: dict[str, Channel]      (registered channel instances)       │
│  ├── hooks: HookEngine                 (sync + async hook pipeline)         │
│  ├── router: EventRouter               (broadcast + permission enforcement) │
│  ├── realtime: RealtimeManager         (WebSocket connections + event bus)  │
│  ├── identity_resolver: IdentityResolver (ABC → pluggable resolution)      │
│  ├── max_event_chain_depth: int = 10     (global safety limit for chains)  │
│  │                                                                          │
│  │  Operations:                                                             │
│  ├── register_channel(channel)         (global: add channel to framework)   │
│  ├── create_room(org_id, metadata)     (create conversation space)          │
│  ├── attach_channel(room_id, ...)      (connect channel to room)            │
│  ├── detach_channel(room_id, ...)      (disconnect from room)               │
│  ├── mute / unmute / set_access / set_visibility (permission primitives)    │
│  ├── update_room_metadata(...)         (mutable room metadata)              │
│  ├── update_binding_metadata(...)      (mutable binding metadata)           │
│  │                                                                          │
│  │  Event subscription:                                                     │
│  ├── hook(trigger, execution, priority) (register hook — decorator)         │
│  ├── on(event_name)                     (subscribe to framework events)     │
│  │                                                                          │
│  │  Note: REST API, WebSocket endpoints, and MCP server are NOT part of     │
│  │  the library. The integrator wraps fw.* calls in their own web framework.│
└─────────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────────┐
│                           CHANNEL (ABC — unified interface)                  │
│                                                                             │
│  Channel (ABC)                                                              │
│  ├── id: str                                                                │
│  ├── type: ChannelType                 (sms | email | websocket | ai | ...) │
│  ├── category: ChannelCategory         (transport | intelligence | integr.) │
│  ├── direction: ChannelDirection       (inbound | outbound | bidirectional) │
│  ├── media_types: list[ChannelMediaType]  (text | audio | video)           │
│  ├── capabilities: ChannelCapabilities (max_text_length, supports_*, ...)  │
│  ├── info: dict[str, Any]             (property — instance config info)     │
│  │                                                                          │
│  │  Methods:                                                                │
│  ├── handle_inbound(raw) → InboundResult    (INBOUND: webhook/WS arrives)  │
│  ├── deliver(event, binding) → DeliveryResult (OUTBOUND: push event out)   │
│  └── on_event(event, room) → ChannelOutput   (READ: react to room event)  │
│                                                                             │
│  Concrete channels:                                                         │
│  ├── SMSChannel(provider: SMSProvider)          direction=BIDIRECTIONAL     │
│  ├── EmailChannel(provider: EmailProvider)      direction=BIDIRECTIONAL     │
│  ├── WebSocketChannel()                          direction=BIDIRECTIONAL     │
│  ├── HTTPChannel()                               direction=BIDIRECTIONAL     │
│  ├── WhatsAppChannel(provider: WhatsAppProvider) direction=BIDIRECTIONAL    │
│  ├── AIChannel(provider: AIProvider)             direction=BIDIRECTIONAL     │
│  └── CustomChannel(...)                          direction=varies            │
└─────────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────────┐
│                           PROVIDERS (swappable implementations)              │
│                                                                             │
│  Provider ABCs (one per channel type that needs external services):          │
│                                                                             │
│  SMSProvider (ABC)                                                          │
│  ├── name: str                                                              │
│  ├── from_number: str                                                       │
│  ├── send_sms(to, body) → ProviderResult                                   │
│  ├── parse_webhook(payload) → InboundMessage                                │
│  │                                                                          │
│  │  Implementations:                                                        │
│  │  ├── TwilioSMSProvider(config: TwilioConfig)                            │
│  │  ├── SinchSMSProvider(config: SinchConfig)                              │
│  │  └── MockSMSProvider()                          (testing)                │
│  │                                                                          │
│  EmailProvider (ABC)                                                        │
│  ├── name: str                                                              │
│  ├── send_email(to, subject, body) → ProviderResult                        │
│  ├── parse_inbound(payload) → InboundEmail                                  │
│  │                                                                          │
│  │  Implementations:                                                        │
│  │  ├── SendGridProvider(config: SendGridConfig)                           │
│  │  ├── SMTPProvider(config: SMTPConfig)                                   │
│  │  └── MockEmailProvider()                                                 │
│  │                                                                          │
│  AIProvider (ABC)                                                           │
│  ├── name: str                                                              │
│  ├── model_name: str                                                        │
│  ├── generate(messages, context: AIContext) → AIResponse                    │
│  │                                                                          │
│  │  Implementations:                                                        │
│  │  ├── PydanticAIProvider(agent: pydantic_ai.Agent)   (recommended)       │
│  │  ├── AnthropicDirectProvider(client, model)                              │
│  │  ├── OpenAIDirectProvider(client, model)                                 │
│  │  └── MockAIProvider(responses: list[str])           (testing)            │
│  │                                                                          │
│  WhatsAppProvider (ABC)                                                     │
│  │  Implementations:                                                        │
│  │  └── MetaWhatsAppProvider(config: MetaConfig)                           │
└─────────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────────┐
│                           PROVIDER CONFIG (typed configuration)              │
│                                                                             │
│  Each provider has a typed Pydantic config:                                 │
│                                                                             │
│  TwilioConfig                                                               │
│  ├── account_sid: str                                                       │
│  ├── auth_token: SecretStr                                                  │
│  ├── from_number: str                                                       │
│  └── webhook_url: str | None                                                │
│                                                                             │
│  SinchConfig                                                                │
│  ├── app_id: str                                                            │
│  ├── app_secret: SecretStr                                                  │
│  └── from_number: str                                                       │
│                                                                             │
│  SendGridConfig                                                             │
│  ├── api_key: SecretStr                                                     │
│  ├── from_email: str                                                        │
│  └── reply_to: str | None                                                   │
│                                                                             │
│  PydanticAIConfig                                                           │
│  ├── model: str                        (e.g., "anthropic:claude-sonnet-4-5")│
│  ├── instructions: str                                                      │
│  └── temperature: float = 0.7                                               │
│                                                                             │
│  AnthropicConfig                                                            │
│  ├── api_key: SecretStr                                                     │
│  ├── model: str = "claude-sonnet-4-5"                                       │
│  ├── max_tokens: int = 4096                                                 │
│  └── temperature: float = 0.7                                               │
│                                                                             │
│  Why typed configs?                                                         │
│  - Pydantic validates at startup (fail fast on bad config)                  │
│  - SecretStr prevents accidental logging of credentials                     │
│  - IDE autocompletion for provider setup                                    │
│  - Can be loaded from env vars, YAML, JSON, or Python                      │
└─────────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────────┐
│                           MODELS (Pydantic data models)                      │
│                                                                             │
│  Room                                                                       │
│  ├── id: str                                                                │
│  ├── organization_id: str                                                   │
│  ├── status: RoomStatus                (active | paused | closed | archived)│
│  ├── channels: list[ChannelBinding]                                         │
│  ├── participants: list[Participant]                                        │
│  ├── timeline: list[RoomEvent]                                              │
│  ├── metadata: dict[str, Any]          (mutable via API)                    │
│  ├── created_at / updated_at: datetime                                      │
│  │                                                                          │
│  ChannelBinding (per-room channel permissions)                              │
│  ├── channel_id: str                                                        │
│  ├── room_id: str                                                           │
│  ├── access: Access                    (read | write | read_write)          │
│  ├── muted: bool                                                            │
│  ├── write_visibility: Visibility      (all | channels([...]) | internal)   │
│  ├── participant_id: str | None                                             │
│  ├── metadata: dict[str, Any]          (mutable via API)                    │
│  ├── attached_at: datetime                                                  │
│  │                                                                          │
│  RoomEvent                                                                  │
│  ├── id: str                                                                │
│  ├── room_id: str                                                           │
│  ├── index: int                          (sequential per room)              │
│  ├── type: EventType                                                        │
│  ├── status: EventStatus                 (delivered | blocked | failed)     │
│  ├── blocked_by: str | None              (hook name, if status=blocked)     │
│  ├── source_channel: str                                                    │
│  ├── participant_id: str | None                                             │
│  ├── content: EventContent             (TextContent | AudioContent | ...)   │
│  ├── source: EventSource               (channel-specific data + raw_payload)│
│  ├── visibility: Visibility                                                 │
│  ├── chain_depth: int                      (0 = inbound; +1 per re-entry)  │
│  ├── correlation_id: str | None                                             │
│  ├── metadata: dict[str, Any]                                               │
│  ├── timestamp: datetime                                                    │
│  │                                                                          │
│  HookResult (what a sync hook returns)                                      │
│  ├── action: "allow" | "block" | "modify"                                  │
│  ├── reason: str | None                  (why blocked/modified)             │
│  ├── modified_event: RoomEvent | None    (only for action=modify)          │
│  ├── inject: list[InjectedEvent]         (targeted replacement events)     │
│  ├── tasks: list[Task]                   (side effects)                    │
│  └── observations: list[Observation]     (side effects)                    │
│                                                                             │
│  InjectedEvent (targeted event from a hook)                                 │
│  ├── content: EventContent               (what to send)                    │
│  ├── target_channel: str | None          (one specific channel)            │
│  ├── target_channels: list[str] | None   (multiple specific channels)     │
│  ├── visibility: Visibility = all        (fallback if no target)           │
│  └── source_label: str = "system"        (identifies injected event source)│
│                                                                             │
│  ChannelOutput (what a channel produces when processing an event)           │
│  ├── events: list[RoomEvent]           (→ subject to permissions)           │
│  ├── tasks: list[Task]                 (→ always stored, not subject to mute│)
│  ├── observations: list[Observation]   (→ always stored)                    │
│  └── metadata_updates: dict            (→ enrich Room metadata)             │
│  Note: Routing suggestions (escalate, etc.) are expressed as Tasks.        │
│                                                                             │
│  DeliveryResult (what deliver() returns)                                    │
│  ├── status: sent | queued | failed                                         │
│  ├── provider_message_id: str | None                                        │
│  ├── error: str | None                                                      │
│  └── metadata: dict[str, Any]          (provider-specific delivery info)    │
│                                                                             │
│  Participant                                                                │
│  ├── id: str                                                                │
│  ├── role: ParticipantRole              (end_user | agent | system)         │
│  ├── identity: Identity | None          (None if pending)                   │
│  ├── identification: IdentificationStatus (identified | pending)            │
│  ├── candidates: list[Identity] | None  (possible identities, if pending)  │
│  ├── connected_via: list[str]           (channel IDs)                      │
│  ├── display_name: str | None           (resolved name or fallback)        │
│  ├── status: ParticipantStatus          (active | idle | left)             │
│  ├── resolved_at: datetime | None                                          │
│  ├── resolved_by: str | None            ("auto" | "advisor" | hook name)   │
│  └── joined_at: datetime                                                   │
│                                                                             │
│  Identity                                                                   │
│  ├── id: str                                                                │
│  ├── organization_id: str                                                   │
│  ├── display_name: str | None                                               │
│  ├── channel_addresses: dict[ChannelType, list[str]]                       │
│  ├── external_id: str | None            (CRM contact ID)                   │
│  └── metadata: dict[str, Any]                                               │
│                                                                             │
│  IdentityResult (what IdentityResolver.resolve() returns)                   │
│  ├── status: "resolved" | "ambiguous" | "unknown"                          │
│  ├── participant: Participant | None    (if resolved)                       │
│  ├── candidates: list[Identity]         (if ambiguous, 2+)                 │
│  ├── address: str                       (raw channel address)              │
│  └── channel_type: ChannelType                                              │
│                                                                             │
│  IdentityHookResult (what on_identity_ambiguous/unknown hook returns)       │
│  ├── resolved(identity)                 (we know who — proceed)            │
│  ├── pending(display_name, candidates)  (advisor resolves later)           │
│  ├── challenge(inject)                  (ask sender to self-identify)      │
│  └── reject(reason)                     (do not create room/participant)   │
└─────────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────────┐
│                           ENUMS (value types)                                │
│                                                                             │
│  ChannelType:           sms | email | websocket | http | whatsapp | ai |   │
│                         custom                                              │
│  ChannelCategory:       transport | intelligence | integration              │
│  ChannelDirection:      inbound | outbound | bidirectional                  │
│  ChannelMediaType:      text | audio | video                                │
│  EventStatus:           delivered | blocked | failed                        │
│  IdentificationStatus:  identified | pending                                │
│  Access:                read | write | read_write                           │
│  Visibility:            all | channels(list[str]) | internal                │
│  EventType:             message | typing | read_receipt | delivery_status | │
│                         channel_attached | channel_detached |               │
│                         channel_muted | channel_unmuted | channel_updated | │
│                         participant_joined | participant_left |             │
│                         participant_identified | task_created |             │
│                         observation | system | custom                       │
│  HookTrigger:           before_broadcast | after_broadcast |                │
│                         on_channel_attached | on_channel_detached |         │
│                         on_channel_muted | on_channel_unmuted |             │
│                         on_room_created | on_room_closed |                  │
│                         on_task_created | on_identity_ambiguous |           │
│                         on_identity_unknown | on_participant_identified |   │
│                         on_error                                            │
│  ParticipantRole:       end_user | agent | system                           │
│  ParticipantStatus:     active | idle | left                                │
│  RoomStatus:            active | paused | closed | archived                 │
│  DeliveryMode:          instant | async | batch                             │
└─────────────────────────────────────────────────────────────────────────────┘

How everything composes — from config to running framework:

from roomkit import RoomKit
from roomkit.channels.sms import SMSChannel
from roomkit.channels.websocket import WebSocketChannel
from roomkit.channels.ai import AIChannel
from roomkit.providers.sms import TwilioSMSProvider, TwilioConfig
from roomkit.providers.ai import PydanticAIProvider
from roomkit.store import PostgresStore
from pydantic_ai import Agent

# 1. Provider configs (typed, validated at startup)
twilio_config = TwilioConfig(
    account_sid="AC...",
    auth_token="secret",        # SecretStr — safe from accidental logging
    from_number="+15551234567",
)

# 2. Providers (created from configs)
sms_provider = TwilioSMSProvider(config=twilio_config)
ai_provider = PydanticAIProvider(
    agent=Agent(model="anthropic:claude-sonnet-4-5", instructions="..."),
)

# 3. Channels (created from providers)
sms_channel = SMSChannel(provider=sms_provider)       # direction=BIDIRECTIONAL
ws_channel = WebSocketChannel()                         # direction=BIDIRECTIONAL
ai_channel = AIChannel(provider=ai_provider, name="support")  # direction=BIDIRECTIONAL

# 4. RoomKit (orchestrates everything)
fw = RoomKit(
    store=InMemoryStore(),                               # or PostgresStore from integration service
    identity_resolver=CRMIdentityResolver(crm_client),
)
fw.register_channel(sms_channel)
fw.register_channel(ws_channel)
fw.register_channel(ai_channel)

# 5. Hooks (integrator's business logic)
@fw.hook(trigger="on_room_created", execution="async")
async def setup_room(room: RoomContext):
    await fw.attach_channel(room.id, "support",
                            access="read_write", visibility="all",
                            metadata={"language": "fr"})

# 6. RoomKit events (real-time, global)
@fw.on("room_created")
async def notify_dashboard(event: FrameworkEvent):
    await dashboard.broadcast(event)

# 7. Integration service (NOT part of the library)
# The integrator wraps fw.* calls in their own web framework:
from fastapi import FastAPI
app = FastAPI()

@app.post("/webhooks/sms/twilio")
async def twilio_webhook(request: Request):
    payload = await request.form()
    await fw.process_inbound("sms_twilio_1", dict(payload))
    return Response(status_code=200)

@app.post("/api/v1/rooms")
async def create_room(body: CreateRoomRequest):
    return await fw.create_room(organization_id=body.organization_id)

# MCP, WebSocket endpoints are also mounted by the integration service.

Appendix C: Evolution Summary

Version Key Insight
v1 AI is a layer above channels. Adapters normalize, AI processes, adapters send.
v2 AI is a channel (same abstraction as SMS). Observer/hybrid modes. Tasks.
v3 Room-first. Human ↔ Human is the base case. Hooks for blocking. Provider abstraction. Dynamic channels. REST + MCP.
v4 Primitives over opinions. Replace ChannelMode enum with access/mute/visibility primitives. Two output paths (events vs side effects). Framework provides mechanics, integrator provides logic.
v5 AI provider abstraction + channel metadata + media types. AI follows same provider pattern as SMS (AIProvider ABC → PydanticAIProvider, AnthropicDirectProvider, etc.). Three-level metadata (channel instance, binding, event). Channel media types (text, audio, video) for future-proofing.
v6 Channel direction + real-time + class hierarchy. Channels declare direction (inbound/outbound/bidirectional). Three methods: handle_inbound, deliver, on_event. Real-time event system (room events + framework events). WebSocket-native with typing indicators. Typed ProviderConfig. Complete class hierarchy.
v7 CPaaS-validated. Research of Twilio, Sinch, Vonage, MessageBird, Infobip. Added: sequential event indexing, read horizon tracking, room timers (auto-transitions), correlation ID, content transcoding, channel fallback, template support. Gap analysis documented in cpaas-comparison.md.
v8 Hook injection + event status + use case validation. Sync hooks can now block AND inject targeted replacement events to specific channels. Events carry status (delivered/blocked/failed) and blocked_by for audit trails. HookResult extended with inject, tasks, observations. New InjectedEvent model for targeted delivery. Use Case 6: SIN sensitivity scanning — validates blocking, targeted responses, and compliance logging.
v9 Identity resolution pipeline + ambiguity handling. IdentityResolver ABC returns IdentityResult with resolved/ambiguous/unknown status. New hook triggers: on_identity_ambiguous, on_identity_unknown, on_participant_identified. Participant model extended with identification status (identified/pending), candidates, resolved_at, resolved_by. IdentityHookResult with 4 strategies: resolved, pending, challenge, reject. REST API: POST /rooms/{id}/participants/{pid}/resolve. Use Case 7: shared family phone number — validates ambiguous identity, advisor manual resolution, audit trail.
v10 Library/service boundary + consistency fixes. Aligned project structure with technical requirements (library has no web framework, API, MCP, or PostgresStore). Removed RoutingDecision from ChannelOutput (use Tasks). Added RoomRouter ABC for pluggable inbound room routing. Documented transcoding pipeline location (Event Router before deliver). Fixed RoomLockManager unbounded growth (LRU eviction). Added idempotency (webhook dedup via provider_message_id, client dedup via correlation_id). Resolved stale open questions (Q1, Q3, Q4, Q5). Updated CPaaS comparison to reflect implemented features.
v11 (current) Event chain depth limit + Quick Start + Named patterns + Contributor guide. Added chain_depth: int to RoomEvent (0 for human/inbound, incremented on channel re-entry). Configurable max_event_chain_depth on Framework (default 10). When depth exceeded, event is blocked via existing status=blocked + blocked_by="event_chain_depth_limit" mechanism — no new enums or status types. Observation emitted for integrator visibility. Side effects still flow (consistent with mute). Added 5-minute Quick Start example (§2). Named patterns table (Direct, Assistant, Observer, Muted, Internal) in §3.3. Contributor guide skeleton (Appendix D). All open questions resolved: Q2 (let both respond), Q6 (roomkit), Q7 (independent open source).

Appendix D: Contributor Guide

This appendix prepares roomkit for open-source contribution. It covers the three most common extension points: custom providers, custom channels, and the tests each requires.

D.1 Implementing a Custom SMSProvider

A custom SMS provider connects a new SMS service (e.g., Vonage, AWS SNS) to roomkit.

Step 1: Implement the ABC

# roomkit/providers/sms/vonage.py
from roomkit.providers.sms.base import SMSProvider, InboundMessage, ProviderResult

class VonageSMSProvider(SMSProvider):
    name = "vonage"

    def __init__(self, api_key: str, api_secret: str, from_number: str):
        self.api_key = api_key
        self.api_secret = api_secret
        self.from_number = from_number

    async def send_sms(self, to: str, body: str) -> ProviderResult:
        # Call Vonage API via httpx
        async with httpx.AsyncClient() as client:
            resp = await client.post("https://rest.nexmo.com/sms/json", json={
                "api_key": self.api_key,
                "api_secret": self.api_secret,
                "from": self.from_number,
                "to": to,
                "text": body,
            })
            data = resp.json()
            return ProviderResult(
                status="sent" if data["messages"][0]["status"] == "0" else "failed",
                provider_message_id=data["messages"][0].get("message-id"),
            )

    async def parse_webhook(self, payload: dict) -> InboundMessage:
        return InboundMessage(
            from_address=payload["msisdn"],
            to_address=payload["to"],
            text=payload.get("text", ""),
            raw_payload=payload,
        )

Step 2: Add a typed config

# roomkit/config/providers.py
class VonageConfig(BaseModel):
    api_key: str
    api_secret: SecretStr
    from_number: str

Step 3: Add the extra dependency

# pyproject.toml
[project.optional-dependencies]
vonage = ["httpx>=0.27"]

D.2 Implementing a Custom AIProvider

A custom AI provider connects a new LLM service or framework to roomkit.

Step 1: Implement the ABC

# roomkit/providers/ai/ollama.py
from roomkit.providers.ai.base import AIProvider, AIContext, AIMessage, AIResponse

class OllamaProvider(AIProvider):
    name = "ollama"
    model_name: str

    def __init__(self, model: str, base_url: str = "http://localhost:11434"):
        self.model_name = model
        self.base_url = base_url

    async def generate(
        self,
        messages: list[AIMessage],
        context: AIContext,
    ) -> AIResponse:
        async with httpx.AsyncClient() as client:
            resp = await client.post(f"{self.base_url}/api/chat", json={
                "model": self.model_name,
                "messages": [
                    {"role": m.role, "content": m.content} for m in messages
                ],
            })
            data = resp.json()
            return AIResponse(text=data["message"]["content"])

Key contract: - generate() receives the full conversation history as list[AIMessage] and an AIContext with room metadata, target channel capabilities, and participants. - Return an AIResponse. Optionally include tasks and observations if the AI produces side effects. - The provider is responsible for adapting the context to the LLM's API format.

D.3 Implementing a Custom Channel

For channels beyond SMS/Email/WebSocket/AI (e.g., Slack, Discord, voice).

# roomkit/channels/slack.py
from roomkit.channels.base import Channel, ChannelType, ChannelCategory, ChannelDirection

class SlackChannel(Channel):
    type = ChannelType.CUSTOM
    category = ChannelCategory.TRANSPORT
    direction = ChannelDirection.BIDIRECTIONAL
    media_types = [ChannelMediaType.TEXT]

    def __init__(self, bot_token: str, id: str = "slack"):
        self.id = id
        self.bot_token = bot_token
        self.capabilities = ChannelCapabilities(
            max_text_length=40000,
            supports_rich_text=True,
            supports_buttons=True,
            supports_threading=True,
            supports_media=True,
        )

    async def handle_inbound(self, raw_payload: Any) -> InboundResult:
        # Parse Slack event payload → RoomEvent
        ...

    async def deliver(self, event: RoomEvent, binding: ChannelBinding) -> DeliveryResult:
        # Post message to Slack channel/DM via Slack API
        ...

    async def on_event(self, event: RoomEvent, room: RoomContext) -> ChannelOutput:
        # Transport channels usually return empty output
        return ChannelOutput()

D.4 Required Tests for a New Provider

Every provider must ship with tests that validate the ABC contract. Use the following pattern:

# tests/unit/test_providers_vonage.py
import pytest
from roomkit.providers.sms.vonage import VonageSMSProvider

class TestVonageSMSProvider:
    """Contract tests for VonageSMSProvider."""

    @pytest.fixture
    def provider(self):
        return VonageSMSProvider(
            api_key="test", api_secret="test", from_number="+15550000000",
        )

    async def test_send_sms_returns_provider_result(self, provider, httpx_mock):
        """send_sms() must return ProviderResult with status and provider_message_id."""
        httpx_mock.add_response(json={"messages": [{"status": "0", "message-id": "abc123"}]})
        result = await provider.send_sms("+15551234567", "Hello")
        assert result.status == "sent"
        assert result.provider_message_id == "abc123"

    async def test_send_sms_failure(self, provider, httpx_mock):
        """send_sms() must return status=failed on provider error."""
        httpx_mock.add_response(json={"messages": [{"status": "1", "error-text": "Throttled"}]})
        result = await provider.send_sms("+15551234567", "Hello")
        assert result.status == "failed"

    async def test_parse_webhook(self, provider):
        """parse_webhook() must return InboundMessage with from/to/text."""
        msg = await provider.parse_webhook({
            "msisdn": "+15551234567",
            "to": "+15559876543",
            "text": "Bonjour",
        })
        assert msg.from_address == "+15551234567"
        assert msg.text == "Bonjour"

Checklist for new providers:

  • [ ] Implements all abstract methods from the ABC
  • [ ] All methods are async def
  • [ ] Has a typed config (pydantic.BaseModel with SecretStr for credentials)
  • [ ] Has a mock variant (no external deps) for integration tests
  • [ ] Unit tests cover: success path, failure path, webhook parsing
  • [ ] Added as optional extra in pyproject.toml
  • [ ] HTTP calls use httpx.AsyncClient (not requests)