Skip to content

Status Bus

Share real-time status updates between agents in a multi-agent setup. When an execution agent completes a task, the voice agent is notified immediately — no polling needed.

Quick start

from roomkit import RoomKit
from roomkit.orchestration.status_bus import StatusBus

# Create a bus (in-memory with optional JSONL persistence)
bus = StatusBus(persist_path="/tmp/session.jsonl")

# Any agent can post status updates
bus.post("exec", "search_google", "ok", detail="Found 7 results")
bus.post("exec", "click_result", "ok", detail="Clicked RoomKit link")
bus.post("system", "task_completed", "completed", detail="Done in 48s")

# Any agent can read recent activity
entries = await bus.recent(5)
text = await bus.recent_text(5)
# → [19:05:15] exec: search_google → ok | Found 7 results
# → [19:05:20] exec: click_result → ok | Clicked RoomKit link
# → [19:05:22] system: task_completed → completed | Done in 48s

# Subscribe for real-time notifications
async def on_status(entry):
    if entry.status == "completed":
        print(f"Task done: {entry.detail}")

await bus.subscribe(on_status)

Framework integration

The StatusBus is wired into RoomKit automatically — no manual setup needed:

from roomkit import RoomKit
from roomkit.orchestration.status_bus import StatusBus

# Default in-memory bus (always available)
kit = RoomKit()
kit.status_bus.post("exec", "search_google", "ok", detail="Found 7 results")

# Custom bus with persistence
kit = RoomKit(status_bus=StatusBus(persist_path="/tmp/session.jsonl"))

Framework events

Every status post emits a status_posted framework event:

@kit.on("status_posted")
async def on_status(event):
    entry = event.data  # dict with ts, agent_id, action, status, detail, metadata
    if entry["status"] == "completed":
        print(f"Task done: {entry['detail']}")

The bus is closed automatically when kit.close() is called.

How it works

The StatusBus is a shared event log with pub/sub. All agents in a room write to it, and any agent can subscribe to be notified when new entries arrive.

Voice Agent ──post──→ StatusBus ──notify──→ Exec Agent
Exec Agent  ──post──→ StatusBus ──notify──→ Voice Agent
System      ──post──→ StatusBus ──notify──→ All subscribers

Status entry

Each entry has:

Field Type Description
ts str ISO timestamp
agent_id str Who posted (e.g. "exec", "voice")
action str What happened (e.g. "search_google", "click_result")
status str Outcome: ok, failed, pending, info, completed
detail str Human-readable description
metadata dict Optional structured data

Sync and async posting

# Sync (fire-and-forget, safe from any context)
bus.post("exec", "search_google", "ok", detail="Found results")

# Async (awaits subscriber notification)
await bus.post_async("exec", "search_google", "ok", detail="Found results")

Pluggable backends

The default InMemoryStatusBackend works for single-process setups. For distributed deployments, implement the StatusBackend ABC:

from roomkit.orchestration.status_bus import StatusBackend, StatusEntry

class RedisStatusBackend(StatusBackend):
    async def publish(self, entry: StatusEntry) -> None:
        await self.redis.publish("status", entry.to_dict())

    async def recent(self, n, *, agent_id=None, status=None):
        # Read from Redis sorted set
        ...

    async def subscribe(self, callback):
        # Redis pub/sub
        ...

    async def unsubscribe(self, callback):
        ...

bus = StatusBus(backend=RedisStatusBackend("redis://localhost:6379"))

Integration with voice agents

Auto-inject status updates into a RealtimeVoiceChannel so the voice agent knows what the execution agent is doing:

async def on_exec_status(entry):
    if entry.agent_id == "exec" and entry.status in ("info", "completed"):
        for session in voice_channel.get_room_sessions("my-room"):
            await voice_channel.inject_text(
                session,
                f"[Agent update] {entry.action}: {entry.detail}",
                role="user",
                silent=True,
            )

await bus.subscribe(on_exec_status)

Delegation guard

Prevent re-delegation when a task was just completed:

completed = await bus.recent(1, status="completed")
if completed:
    age = (now - completed[0].ts).total_seconds()
    if age < 5:
        return "Task already completed. Ask the user if they need something different."

JSONL persistence

Pass persist_path to persist entries to a JSONL file:

bus = StatusBus(persist_path="/tmp/screen_ai/session.jsonl")

Each entry is appended as one JSON line. Useful for debugging and audit trails.

await bus.print_summary()
Status Log
============================================================
   1. [+] exec   search_google("roomkit conversation AI")
      Found 15 text elements on results page
   2. [+] exec   click_result(118)
      Clicked "roomkit.live" at (274,313)
   3. [*] system task_completed
      Duration: 48303ms

  Total: 3 entries (2 ok, 0 failed, 1 completed)