Realtime Features¶
RoomKit supports two event categories: persistent events stored in the ConversationStore (messages, edits, deletions) and ephemeral events delivered in real time through the RealtimeBackend (typing indicators, presence, reactions, read receipts). This guide covers both.
Event Categories¶
| Category | Stored | Delivered via | Examples |
|---|---|---|---|
| Persistent | Yes | process_inbound() pipeline + broadcast |
Messages, edits, deletions |
| Ephemeral | No | RealtimeBackend pub/sub |
Typing, presence, reactions, tool calls |
Typing Indicators¶
from __future__ import annotations
from roomkit import RoomKit
kit = RoomKit()
# User starts typing
await kit.publish_typing("room-1", "alice", is_typing=True)
# User stops typing
await kit.publish_typing("room-1", "alice", is_typing=False)
# With additional metadata
await kit.publish_typing("room-1", "alice", is_typing=True, data={"name": "Alice"})
Presence Tracking¶
Three standard statuses map to dedicated event types:
await kit.publish_presence("room-1", "alice", "online")
await kit.publish_presence("room-1", "alice", "away")
await kit.publish_presence("room-1", "alice", "offline")
# Custom status → EphemeralEventType.CUSTOM with data={"status": "in_meeting"}
await kit.publish_presence("room-1", "alice", "in_meeting")
Reactions¶
Add or remove emoji reactions on messages:
# Add a reaction
await kit.publish_reaction("room-1", "alice", target_event_id, emoji="thumbsup")
# Remove a reaction
await kit.publish_reaction("room-1", "alice", target_event_id, emoji="thumbsup", action="remove")
Read Receipts¶
Two complementary mechanisms — ephemeral for UI, persistent for state:
# Ephemeral: "seen" indicator (not stored)
await kit.publish_read_receipt("room-1", "alice", event_id)
# Persistent: durable read state (stored in ConversationStore)
await kit.mark_read("room-1", "ws-alice", event_id)
await kit.mark_all_read("room-1", "ws-alice")
Note
publish_read_receipt takes a user_id, while mark_read takes a channel_id. Ephemeral receipts are user-facing; persistent state is per-channel.
Subscribing to Events¶
Subscribe to all ephemeral events in a room:
from __future__ import annotations
from roomkit import RoomKit
from roomkit.realtime import EphemeralEvent, EphemeralEventType
kit = RoomKit()
async def on_event(event: EphemeralEvent) -> None:
if event.type == EphemeralEventType.TYPING_START:
print(f"{event.user_id} is typing...")
elif event.type == EphemeralEventType.PRESENCE_ONLINE:
print(f"{event.user_id} came online")
elif event.type == EphemeralEventType.REACTION:
print(f"{event.user_id} reacted with {event.data['emoji']}")
elif event.type == EphemeralEventType.TOOL_CALL_START:
tools = event.data["tool_calls"]
print(f"AI calling: {[t['name'] for t in tools]}")
sub_id = await kit.subscribe_room("room-1", on_event)
# Later:
await kit.unsubscribe_room(sub_id)
EphemeralEventType¶
All 12 ephemeral event types:
| Type | Published by |
|---|---|
TYPING_START |
publish_typing(is_typing=True) |
TYPING_STOP |
publish_typing(is_typing=False) |
PRESENCE_ONLINE |
publish_presence(status="online") |
PRESENCE_AWAY |
publish_presence(status="away") |
PRESENCE_OFFLINE |
publish_presence(status="offline") |
READ_RECEIPT |
publish_read_receipt() |
REACTION |
publish_reaction() |
TOOL_CALL_START |
Automatic from AIChannel |
TOOL_CALL_END |
Automatic from AIChannel |
THINKING_START |
Automatic from AIChannel |
THINKING_END |
Automatic from AIChannel |
CUSTOM |
Direct publish |
Message Editing¶
Edits are persistent events — stored and broadcast to all channels:
from __future__ import annotations
from roomkit import EventType, InboundMessage, RoomKit, TextContent
from roomkit.models.events import EditContent
kit = RoomKit()
await kit.process_inbound(InboundMessage(
channel_id="ws-alice",
sender_id="alice",
event_type=EventType.EDIT,
content=EditContent(
target_event_id=original_msg_id,
new_content=TextContent(body="Fixed typo!"),
),
))
The new_content field accepts any EventContent type — you can change a text message to rich content or vice versa.
Message Deletion¶
Deletions are also persistent events:
from __future__ import annotations
from roomkit import DeleteType, EventType, InboundMessage, RoomKit
from roomkit.models.events import DeleteContent
kit = RoomKit()
await kit.process_inbound(InboundMessage(
channel_id="ws-alice",
sender_id="alice",
event_type=EventType.DELETE,
content=DeleteContent(
target_event_id=msg_id,
delete_type=DeleteType.SENDER,
reason="Changed my mind",
),
))
| DeleteType | Meaning |
|---|---|
SENDER |
Original sender deletes their message |
SYSTEM |
System-initiated (e.g., content policy) |
ADMIN |
Admin/moderator removal |
Tool Call Events¶
AIChannel automatically publishes TOOL_CALL_START and TOOL_CALL_END — subscribe to observe:
async def on_tool(event: EphemeralEvent) -> None:
if event.type == EphemeralEventType.TOOL_CALL_START:
tools = event.data["tool_calls"]
print(f"Calling: {[t['name'] for t in tools]}")
elif event.type == EphemeralEventType.TOOL_CALL_END:
print(f"Completed in {event.data.get('duration_ms')}ms")
sub_id = await kit.subscribe_room("room-1", on_tool)
Use these to display "AI is searching..." indicators in a chat UI.
Custom Ephemeral Events¶
For application-specific events:
from __future__ import annotations
from roomkit.realtime import EphemeralEvent, EphemeralEventType
event = EphemeralEvent(
room_id="room-1",
type=EphemeralEventType.CUSTOM,
user_id="system",
data={"action": "file_uploaded", "filename": "report.pdf"},
)
await kit.realtime.publish_to_room("room-1", event)
RealtimeBackend ABC¶
For distributed deployments, implement a custom backend (e.g., Redis pub/sub):
from __future__ import annotations
from roomkit.realtime import EphemeralCallback, EphemeralEvent, RealtimeBackend
class RedisRealtimeBackend(RealtimeBackend):
def __init__(self, redis_url: str) -> None:
self._redis_url = redis_url
async def publish(self, channel: str, event: EphemeralEvent) -> None:
await self._redis.publish(channel, event.to_dict())
async def subscribe(self, channel: str, callback: EphemeralCallback) -> str:
... # Subscribe to Redis channel, return subscription ID
async def unsubscribe(self, subscription_id: str) -> bool:
... # Unsubscribe from Redis channel
async def close(self) -> None:
... # Clean up Redis connections
kit = RoomKit(realtime=RedisRealtimeBackend("redis://localhost:6379"))
EphemeralEvent.to_dict() and EphemeralEvent.from_dict() handle JSON serialization for transport.
InMemoryRealtime Details¶
The default InMemoryRealtime is designed for single-process deployments:
| Property | Detail |
|---|---|
| Queue overflow | LRU-style — oldest events dropped when queue fills (default: 100) |
| Background tasks | Each subscription has its own asyncio task |
| Error resilience | Callback errors logged, subscription continues |
| Cleanup | close() cancels all tasks. RoomKit.close() calls this |
Adjust the queue size:
from roomkit.realtime import InMemoryRealtime
kit = RoomKit(realtime=InMemoryRealtime(max_queue_size=500))
Combining Ephemeral and Persistent¶
A typical pattern uses both for responsiveness and durability:
from __future__ import annotations
from roomkit import RoomKit
from roomkit.realtime import EphemeralEvent, EphemeralEventType
async def handle_event(kit: RoomKit, room_id: str, event: EphemeralEvent) -> None:
if event.type == EphemeralEventType.READ_RECEIPT:
# Ephemeral: update UI immediately
await send_to_websocket({"type": "seen", "user": event.user_id})
# Persistent: also store the read state
await kit.mark_read(room_id, f"ws-{event.user_id}", event.data["event_id"])