Skip to content

Realtime Features

RoomKit supports two event categories: persistent events stored in the ConversationStore (messages, edits, deletions) and ephemeral events delivered in real time through the RealtimeBackend (typing indicators, presence, reactions, read receipts). This guide covers both.

Event Categories

Category Stored Delivered via Examples
Persistent Yes process_inbound() pipeline + broadcast Messages, edits, deletions
Ephemeral No RealtimeBackend pub/sub Typing, presence, reactions, tool calls

Typing Indicators

from __future__ import annotations

from roomkit import RoomKit

kit = RoomKit()

# User starts typing
await kit.publish_typing("room-1", "alice", is_typing=True)

# User stops typing
await kit.publish_typing("room-1", "alice", is_typing=False)

# With additional metadata
await kit.publish_typing("room-1", "alice", is_typing=True, data={"name": "Alice"})

Presence Tracking

Three standard statuses map to dedicated event types:

await kit.publish_presence("room-1", "alice", "online")
await kit.publish_presence("room-1", "alice", "away")
await kit.publish_presence("room-1", "alice", "offline")

# Custom status → EphemeralEventType.CUSTOM with data={"status": "in_meeting"}
await kit.publish_presence("room-1", "alice", "in_meeting")

Reactions

Add or remove emoji reactions on messages:

# Add a reaction
await kit.publish_reaction("room-1", "alice", target_event_id, emoji="thumbsup")

# Remove a reaction
await kit.publish_reaction("room-1", "alice", target_event_id, emoji="thumbsup", action="remove")

Read Receipts

Two complementary mechanisms — ephemeral for UI, persistent for state:

# Ephemeral: "seen" indicator (not stored)
await kit.publish_read_receipt("room-1", "alice", event_id)

# Persistent: durable read state (stored in ConversationStore)
await kit.mark_read("room-1", "ws-alice", event_id)
await kit.mark_all_read("room-1", "ws-alice")

Note

publish_read_receipt takes a user_id, while mark_read takes a channel_id. Ephemeral receipts are user-facing; persistent state is per-channel.


Subscribing to Events

Subscribe to all ephemeral events in a room:

from __future__ import annotations

from roomkit import RoomKit
from roomkit.realtime import EphemeralEvent, EphemeralEventType

kit = RoomKit()


async def on_event(event: EphemeralEvent) -> None:
    if event.type == EphemeralEventType.TYPING_START:
        print(f"{event.user_id} is typing...")
    elif event.type == EphemeralEventType.PRESENCE_ONLINE:
        print(f"{event.user_id} came online")
    elif event.type == EphemeralEventType.REACTION:
        print(f"{event.user_id} reacted with {event.data['emoji']}")
    elif event.type == EphemeralEventType.TOOL_CALL_START:
        tools = event.data["tool_calls"]
        print(f"AI calling: {[t['name'] for t in tools]}")


sub_id = await kit.subscribe_room("room-1", on_event)

# Later:
await kit.unsubscribe_room(sub_id)

EphemeralEventType

All 12 ephemeral event types:

Type Published by
TYPING_START publish_typing(is_typing=True)
TYPING_STOP publish_typing(is_typing=False)
PRESENCE_ONLINE publish_presence(status="online")
PRESENCE_AWAY publish_presence(status="away")
PRESENCE_OFFLINE publish_presence(status="offline")
READ_RECEIPT publish_read_receipt()
REACTION publish_reaction()
TOOL_CALL_START Automatic from AIChannel
TOOL_CALL_END Automatic from AIChannel
THINKING_START Automatic from AIChannel
THINKING_END Automatic from AIChannel
CUSTOM Direct publish

Message Editing

Edits are persistent events — stored and broadcast to all channels:

from __future__ import annotations

from roomkit import EventType, InboundMessage, RoomKit, TextContent
from roomkit.models.events import EditContent

kit = RoomKit()

await kit.process_inbound(InboundMessage(
    channel_id="ws-alice",
    sender_id="alice",
    event_type=EventType.EDIT,
    content=EditContent(
        target_event_id=original_msg_id,
        new_content=TextContent(body="Fixed typo!"),
    ),
))

The new_content field accepts any EventContent type — you can change a text message to rich content or vice versa.

Message Deletion

Deletions are also persistent events:

from __future__ import annotations

from roomkit import DeleteType, EventType, InboundMessage, RoomKit
from roomkit.models.events import DeleteContent

kit = RoomKit()

await kit.process_inbound(InboundMessage(
    channel_id="ws-alice",
    sender_id="alice",
    event_type=EventType.DELETE,
    content=DeleteContent(
        target_event_id=msg_id,
        delete_type=DeleteType.SENDER,
        reason="Changed my mind",
    ),
))
DeleteType Meaning
SENDER Original sender deletes their message
SYSTEM System-initiated (e.g., content policy)
ADMIN Admin/moderator removal

Tool Call Events

AIChannel automatically publishes TOOL_CALL_START and TOOL_CALL_END — subscribe to observe:

async def on_tool(event: EphemeralEvent) -> None:
    if event.type == EphemeralEventType.TOOL_CALL_START:
        tools = event.data["tool_calls"]
        print(f"Calling: {[t['name'] for t in tools]}")
    elif event.type == EphemeralEventType.TOOL_CALL_END:
        print(f"Completed in {event.data.get('duration_ms')}ms")

sub_id = await kit.subscribe_room("room-1", on_tool)

Use these to display "AI is searching..." indicators in a chat UI.

Custom Ephemeral Events

For application-specific events:

from __future__ import annotations

from roomkit.realtime import EphemeralEvent, EphemeralEventType

event = EphemeralEvent(
    room_id="room-1",
    type=EphemeralEventType.CUSTOM,
    user_id="system",
    data={"action": "file_uploaded", "filename": "report.pdf"},
)
await kit.realtime.publish_to_room("room-1", event)

RealtimeBackend ABC

For distributed deployments, implement a custom backend (e.g., Redis pub/sub):

from __future__ import annotations

from roomkit.realtime import EphemeralCallback, EphemeralEvent, RealtimeBackend


class RedisRealtimeBackend(RealtimeBackend):
    def __init__(self, redis_url: str) -> None:
        self._redis_url = redis_url

    async def publish(self, channel: str, event: EphemeralEvent) -> None:
        await self._redis.publish(channel, event.to_dict())

    async def subscribe(self, channel: str, callback: EphemeralCallback) -> str:
        ...  # Subscribe to Redis channel, return subscription ID

    async def unsubscribe(self, subscription_id: str) -> bool:
        ...  # Unsubscribe from Redis channel

    async def close(self) -> None:
        ...  # Clean up Redis connections


kit = RoomKit(realtime=RedisRealtimeBackend("redis://localhost:6379"))

EphemeralEvent.to_dict() and EphemeralEvent.from_dict() handle JSON serialization for transport.

InMemoryRealtime Details

The default InMemoryRealtime is designed for single-process deployments:

Property Detail
Queue overflow LRU-style — oldest events dropped when queue fills (default: 100)
Background tasks Each subscription has its own asyncio task
Error resilience Callback errors logged, subscription continues
Cleanup close() cancels all tasks. RoomKit.close() calls this

Adjust the queue size:

from roomkit.realtime import InMemoryRealtime

kit = RoomKit(realtime=InMemoryRealtime(max_queue_size=500))

Combining Ephemeral and Persistent

A typical pattern uses both for responsiveness and durability:

from __future__ import annotations

from roomkit import RoomKit
from roomkit.realtime import EphemeralEvent, EphemeralEventType


async def handle_event(kit: RoomKit, room_id: str, event: EphemeralEvent) -> None:
    if event.type == EphemeralEventType.READ_RECEIPT:
        # Ephemeral: update UI immediately
        await send_to_websocket({"type": "seen", "user": event.user_id})
        # Persistent: also store the read state
        await kit.mark_read(room_id, f"ws-{event.user_id}", event.data["event_id"])