Skip to content

RoomKit

RoomKit

RoomKit(store=None, identity_resolver=None, identity_channel_types=None, inbound_router=None, lock_manager=None, realtime=None, max_chain_depth=5, identity_timeout=10.0, process_timeout=30.0, stt=None, tts=None, voice=None, task_runner=None, delivery_strategy=None, telemetry=None, inbound_rate_limit=None)

Central orchestrator tying rooms, channels, hooks, and storage.

Initialise the RoomKit orchestrator.

Parameters:

Name Type Description Default
store ConversationStore | None

Persistent storage backend. Defaults to InMemoryStore.

None
identity_resolver IdentityResolver | None

Optional resolver for identifying inbound senders.

None
identity_channel_types set[ChannelType] | None

Restrict identity resolution to specific channel types. If None (default), resolution runs for all channels. Set to e.g. {ChannelType.SMS} to only resolve identity for SMS.

None
inbound_router InboundRoomRouter | None

Strategy for routing inbound messages to rooms. Defaults to DefaultInboundRoomRouter.

None
lock_manager RoomLockManager | None

Per-room locking backend. Defaults to InMemoryLockManager. For multi-process deployments, supply a distributed implementation (e.g. Redis-backed).

None
realtime RealtimeBackend | None

Realtime backend for ephemeral events (typing, presence). Defaults to InMemoryRealtime. For multi-process deployments, supply a distributed implementation (e.g. Redis pub/sub).

None
max_chain_depth int

Maximum reentry chain depth to prevent infinite loops.

5
identity_timeout float

Timeout in seconds for identity resolution calls.

10.0
process_timeout float

Timeout in seconds for the locked processing phase.

30.0
stt STTProvider | None

Optional speech-to-text provider for transcription.

None
tts TTSProvider | None

Optional text-to-speech provider for synthesis.

None
voice VoiceBackend | None

Optional voice backend for real-time audio transport.

None
task_runner TaskRunner | None

Pluggable backend for delegated background tasks. Defaults to InMemoryTaskRunner.

None
delivery_strategy BackgroundTaskDeliveryStrategy | None

Controls proactive delivery of background task results. When set, strategy.deliver() is called after system prompt injection and the ON_TASK_COMPLETED hook. Can be overridden per-task via delegate().

None
telemetry TelemetryConfig | TelemetryProvider | None

Optional telemetry provider or config for span/metric collection. Accepts a TelemetryProvider instance or a TelemetryConfig. Defaults to NoopTelemetryProvider.

None
inbound_rate_limit RateLimit | None

Optional rate limit applied to all inbound messages before any processing. Messages exceeding the limit are dropped with reason="rate_limited". Keyed per channel_id.

None

store property

store

The backing conversation store.

hook_engine property

hook_engine

The hook engine used for sync/async hook pipelines.

realtime property

realtime

The realtime backend for ephemeral events.

get_timeline async

get_timeline(room_id, offset=0, limit=50, visibility_filter=None)

Query the event timeline for a room.

list_tasks async

list_tasks(room_id, status=None)

List tasks for a room, optionally filtered by status.

list_observations async

list_observations(room_id)

List observations for a room.

send_event async

send_event(room_id, channel_id, content, event_type=MESSAGE, chain_depth=0, participant_id=None, metadata=None, visibility='all', provider=None, response_visibility=None)

Send an event directly into a room from a channel.

Parameters:

Name Type Description Default
room_id str

Target room ID

required
channel_id str

Source channel ID

required
content Any

Event content (TextContent, RichContent, etc.)

required
event_type EventType

Type of event (default MESSAGE)

MESSAGE
chain_depth int

Depth in response chain (for loop prevention)

0
participant_id str | None

Optional participant/sender ID for the event source

None
metadata dict[str, Any] | None

Optional event metadata

None
visibility str

Event visibility ("all" or "internal")

'all'
provider str | None

Optional provider/backend name for event attribution

None
response_visibility str | None

Controls where the AI's response is delivered. Uses the same vocabulary as visibility. None means no restriction.

None

connect_websocket async

connect_websocket(channel_id, connection_id, send_fn, *, stream_send_fn=None)

Register a WebSocket connection and emit framework event.

disconnect_websocket async

disconnect_websocket(channel_id, connection_id)

Unregister a WebSocket connection and emit framework event.

mark_read async

mark_read(room_id, channel_id, event_id)

Mark an event as read for a channel.

mark_all_read async

mark_all_read(room_id, channel_id)

Mark all events as read for a channel.

publish_typing async

publish_typing(room_id, user_id, is_typing=True, data=None)

Publish a typing indicator for a user in a room.

Parameters:

Name Type Description Default
room_id str

The room to publish the typing event in.

required
user_id str

The user who is typing.

required
is_typing bool

True for typing_start, False for typing_stop.

True
data dict[str, Any] | None

Optional additional data (e.g., {"name": "User Name"}).

None

publish_presence async

publish_presence(room_id, user_id, status)

Publish a presence update for a user in a room.

Parameters:

Name Type Description Default
room_id str

The room to publish the presence event in.

required
user_id str

The user whose presence changed.

required
status str

One of "online", "away", or "offline".

required

publish_read_receipt async

publish_read_receipt(room_id, user_id, event_id)

Publish a read receipt for a user in a room.

Parameters:

Name Type Description Default
room_id str

The room to publish the read receipt in.

required
user_id str

The user who read the message.

required
event_id str

The ID of the event that was read.

required

subscribe_room async

subscribe_room(room_id, callback)

Subscribe to ephemeral events for a room.

Parameters:

Name Type Description Default
room_id str

The room to subscribe to.

required
callback EphemeralCallback

Async callback invoked for each ephemeral event.

required

Returns:

Type Description
str

A subscription ID that can be used to unsubscribe.

unsubscribe_room async

unsubscribe_room(subscription_id)

Unsubscribe from ephemeral events.

Parameters:

Name Type Description Default
subscription_id str

The subscription ID returned by subscribe_room.

required

Returns:

Type Description
bool

True if the subscription existed and was removed.

hook

hook(trigger, execution=SYNC, priority=0, name='', timeout=30.0, channel_types=None, channel_ids=None, directions=None)

Decorator to register a global hook.

Parameters:

Name Type Description Default
trigger HookTrigger

When the hook fires (BEFORE_BROADCAST, AFTER_BROADCAST, etc.)

required
execution HookExecution

SYNC (can block/modify) or ASYNC (fire-and-forget)

SYNC
priority int

Lower numbers run first (default: 0)

0
name str

Optional name for logging and removal

''
timeout float

Max execution time in seconds (default: 30.0)

30.0
channel_types set[ChannelType] | None

Only run for events from these channel types (None = all)

None
channel_ids set[str] | None

Only run for events from these channel IDs (None = all)

None
directions set[ChannelDirection] | None

Only run for events with these directions (None = all)

None

on

on(event_type)

Decorator to register a framework event handler filtered by type.

identity_hook

identity_hook(trigger, channel_types=None, channel_ids=None, directions=None)

Decorator to register an identity-resolution hook.

The decorated function receives (event, context, id_result) and returns an IdentityHookResult or None.

Parameters:

Name Type Description Default
trigger HookTrigger

When the hook fires (ON_IDENTITY_AMBIGUOUS, ON_IDENTITY_UNKNOWN).

required
channel_types set[ChannelType] | None

Only run for events from these channel types (None = all).

None
channel_ids set[str] | None

Only run for events from these channel IDs (None = all).

None
directions set[ChannelDirection] | None

Only run for events with these directions (None = all).

None

add_room_hook

add_room_hook(room_id, trigger, execution, fn, priority=0, name='')

Add a hook for a specific room.

remove_room_hook

remove_room_hook(room_id, name)

Remove a room hook by name.

Exceptions

RoomNotFoundError

Bases: RoomKitError

Room does not exist.

ChannelNotFoundError

Bases: RoomKitError

Channel binding not found in room.

ChannelNotRegisteredError

Bases: RoomKitError

Channel type not registered.

Infrastructure

RoomLockManager

Bases: ABC

Abstract base for per-room locking.

Implement this to plug in any locking backend (Redis, Postgres advisory locks, etc.). The library ships with InMemoryLockManager for single-process deployments.

Implementations should be reentrant within the same execution context (including child tasks spawned by asyncio.gather): if a coroutine already holds the lock for a room and awaits code that tries to acquire the same room lock, the inner acquisition must succeed without deadlocking. This is required because tool handlers (e.g. handoff) may update room state while the inbound pipeline already holds the room lock.

locked abstractmethod async

locked(room_id)

Acquire an exclusive lock for room_id.

InMemoryLockManager

InMemoryLockManager(max_locks=1024)

Bases: RoomLockManager

In-process per-room asyncio locks with LRU eviction.

Reentrant within the same execution context: if the current context already holds the lock for a given room (including child tasks spawned by asyncio.gather), locked() yields immediately instead of deadlocking.

Suitable for single-process deployments. For multi-process or distributed setups, provide a custom RoomLockManager backed by Redis, Postgres advisory locks, or similar.

size property

size

Return the number of locks currently held.

locked async

locked(room_id)

Acquire the lock for a room (reentrant via ContextVar).

AuthCallback module-attribute

AuthCallback = Callable[[Any], Awaitable[dict[str, Any] | None]]

Async callback for transport authentication.

Receives the connection context (e.g. WebSocket, HTTP request) and returns a metadata dict on success or None to reject the connection.

auth_context module-attribute

auth_context = ContextVar('auth_context', default=None)

Context variable holding auth metadata from the most recent authentication.

Set automatically before session_factory is called so the factory can read auth metadata without changing its signature::

from roomkit.voice.auth import auth_context

async def my_session_factory(websocket_id: str) -> VoiceSession:
    meta = auth_context.get()  # dict or None
    ...