Back to blog

Guardrails in RoomKit: Composable Safety for Multi-Channel AI

March 26, 2026 · 18 min read

How RoomKit turns AI safety from a monolithic afterthought into layered, composable primitives you wire together like middleware.


If you've built AI-powered applications, you know the pattern: you ship the happy path first, then bolt on safety checks as an afterthought: a regex here, a moderation API there, a prayer to the demo gods everywhere else.

RoomKit takes a different approach. Guardrails aren't a feature you add later. They're composable primitives (hooks, policies, rate limits, circuit breakers) that you stack at every stage of the message lifecycle. Think of them as middleware for AI safety, except the pipeline has seven distinct interception points, not just "before" and "after."

The Pipeline

Every message in RoomKit flows through a well-defined pipeline. Guardrails can intercept at each stage:

Inbound Message
  → [Input Guardrails]        ← Block, modify, or redact before AI sees it
  → AI Channel processing
    → [Tool Guardrails]       ← Control which tools AI can call
    → [Processing Limits]     ← Prevent runaway loops and token burn
  → [Output Guardrails]       ← Filter AI responses before delivery
  → EventRouter.broadcast()
    → [Channel Guardrails]    ← Per-channel rate limits and permissions
  → [Audit]                   ← Async observation for compliance

The key insight: the same hook mechanism handles both input and output filtering. RoomKit's BEFORE_BROADCAST hook fires for every event entering the broadcast pipeline, whether it comes from a user or from an AI channel re-entering with its response. You distinguish them by checking the event source.

Layer 1: Input Guardrails

Input guardrails intercept messages before they reach AI channels. They use BEFORE_BROADCAST hooks with synchronous execution, running in priority order. Each hook returns one of three results:

Blocking Harmful Content

The simplest guardrail is a content filter. Here's one that blocks messages containing prohibited words:

@kit.hook(HookTrigger.BEFORE_BROADCAST, name="toxicity_filter", priority=0)
async def toxicity_filter(event: RoomEvent, ctx: RoomContext) -> HookResult:
    if isinstance(event.content, TextContent):
        blocked_words = {"badword", "spam", "scam"}
        words = set(event.content.body.lower().split())
        if words & blocked_words:
            return HookResult.block(
                reason=f"Blocked: prohibited words {words & blocked_words}"
            )
    return HookResult.allow()

When blocked, the reason is stored in the InboundResult returned by kit.process_inbound(), so your application layer knows exactly why a message was rejected.

Redacting PII

Sometimes you don't want to block a message, you want to sanitize it. PII redaction is a classic example: the user's intent is fine, but they accidentally included a phone number or SSN.

PII_PATTERNS = {
    "phone": re.compile(r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b"),
    "ssn": re.compile(r"\b\d{3}-\d{2}-\d{4}\b"),
    "email": re.compile(r"\b[\w.+-]+@[\w-]+\.[\w.-]+\b"),
    "credit_card": re.compile(r"\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b"),
}

@kit.hook(HookTrigger.BEFORE_BROADCAST, name="pii_redactor", priority=1)
async def pii_redactor(event: RoomEvent, ctx: RoomContext) -> HookResult:
    if not isinstance(event.content, TextContent):
        return HookResult.allow()

    text = event.content.body
    changed = False
    for label, pattern in PII_PATTERNS.items():
        new_text = pattern.sub(f"[{label.upper()}_REDACTED]", text)
        if new_text != text:
            text = new_text
            changed = True

    if changed:
        modified = event.model_copy(update={"content": TextContent(body=text)})
        return HookResult.modify(modified)
    return HookResult.allow()

Notice the priority ordering: the toxicity filter runs at priority 0, PII redaction at priority 1. Blocked messages are never processed further, so there's no point redacting PII in a message that's already been rejected.

Jailbreak Detection

Pattern-based jailbreak detection is the first line of defense. It won't catch sophisticated attempts, but it handles the obvious ones:

JAILBREAK_PATTERNS = [
    "ignore previous instructions",
    "you are now",
    "pretend you are",
    "act as if you have no restrictions",
    "bypass your guidelines",
]

@kit.hook(HookTrigger.BEFORE_BROADCAST, name="jailbreak_detector", priority=0)
async def jailbreak_detector(event: RoomEvent, ctx: RoomContext) -> HookResult:
    if isinstance(event.content, TextContent):
        text = event.content.body.lower()
        for pattern in JAILBREAK_PATTERNS:
            if pattern in text:
                return HookResult.block(reason=f"Jailbreak attempt: '{pattern}'")
    return HookResult.allow()

For production systems, you'll want to call an external moderation API (OpenAI Moderation, AWS Bedrock Guardrails, LlamaGuard) inside the hook for ML-powered classification. RoomKit hooks support timeout parameters to prevent slow external services from blocking the pipeline:

@kit.hook(
    HookTrigger.BEFORE_BROADCAST,
    name="openai_moderation",
    priority=0,
    timeout=5.0,  # Don't let slow APIs stall the pipeline
)
async def openai_moderation(event: RoomEvent, ctx: RoomContext) -> HookResult:
    # Call external moderation API...

If the hook times out, the event is allowed by default: fail-open, not fail-closed. This is a deliberate design choice: in a real-time conversation, a stuck moderation API shouldn't freeze the entire experience.

Layer 2: Tool Guardrails

AI agents are only as safe as the tools they can call. RoomKit provides two mechanisms for tool safety: declarative policies and runtime auditing.

Tool Policies

ToolPolicy is a declarative allow/deny system with glob patterns and role-based overrides:

policy = ToolPolicy(
    allow=["get_weather", "search_*", "lookup_*"],
    deny=["delete_*", "admin_*"],
    role_overrides={
        "supervisor": RoleOverride(
            allow=["delete_*"],
            mode="replace",      # Fully override base policy
        ),
        "observer": RoleOverride(
            allow=["search_*"],
            mode="restrict",     # Intersect with base allow
        ),
    },
)

The resolution is deterministic: deny always wins over allow, and empty lists mean "permit all." Role overrides can either restrict (intersect with the base policy) or replace (override it completely).

Runtime Tool Auditing

ON_TOOL_CALL hooks let you audit or conditionally block tool invocations based on runtime context, not just the tool name, but its arguments:

@kit.hook(HookTrigger.ON_TOOL_CALL, name="tool_auditor")
async def tool_auditor(event: RoomEvent, ctx: RoomContext) -> HookResult:
    tool_name = event.metadata.get("tool_name", "unknown")
    arguments = event.metadata.get("arguments", {})

    if tool_name == "query_database" and "users" in arguments.get("table", ""):
        return HookResult.block(reason="Direct user table access not permitted")
    return HookResult.allow()

This is where static policies and dynamic checks complement each other: the policy says "you can call query_database," but the runtime hook says "not against the users table."

Layer 3: Processing Guardrails

Even with safe inputs and controlled tools, AI can still misbehave by looping indefinitely or burning through tokens.

Chain Depth Limits

When multiple AI channels respond to each other, you can get infinite loops. RoomKit enforces a configurable chain depth:

kit = RoomKit(max_chain_depth=3)

Beyond this depth, responses are blocked with EventStatus.BLOCKED and the reason "event_chain_depth_limit".

Tool Loop Limits

Runaway tool-calling loops are a real cost risk. RoomKit provides three knobs:

ai = AIChannel(
    "ai-assistant",
    provider=provider,
    max_tool_rounds=20,               # Max iterations (default: 200)
    tool_loop_timeout_seconds=30.0,   # Hard timeout (default: 300s)
    tool_loop_warn_after=10,          # Warning threshold (default: 50)
)

Steering Directives

Sometimes you need to cancel an active generation mid-stream. A user hits "stop," or a supervisor intervenes:

ai_channel.steer(Cancel(reason="User requested stop"))

Token and Cost Control

Cap output length and thinking budget to keep costs predictable:

ai = AIChannel(
    "ai-assistant",
    provider=provider,
    max_tokens=512,
    thinking_budget=2000,
    max_context_events=30,
)

Layer 4: Output Guardrails

Here's where RoomKit's design pays off. AI responses re-enter the broadcast pipeline through the same BEFORE_BROADCAST hooks that filter user input. You distinguish AI output by checking the event source:

@kit.hook(HookTrigger.BEFORE_BROADCAST, name="output_filter", priority=10)
async def output_filter(event: RoomEvent, ctx: RoomContext) -> HookResult:
    if not event.source or not event.source.channel_id:
        return HookResult.allow()

    binding = ctx.get_binding(event.source.channel_id)
    if not binding or binding.category != ChannelCategory.INTELLIGENCE:
        return HookResult.allow()

    if isinstance(event.content, TextContent):
        text = event.content.body
        leak_indicators = ["my system prompt", "my instructions say"]
        if any(ind in text.lower() for ind in leak_indicators):
            replacement = event.model_copy(
                update={"content": TextContent(body="I can't share that information.")}
            )
            return HookResult.modify(replacement)
    return HookResult.allow()

Using a higher priority (10 vs. 0) ensures output filters run after input guardrails. Input guardrails short-circuit early, so output filters only process AI-generated events.

Layer 5: Channel Guardrails

Different channels have different constraints. SMS carriers reject certain content. WebSocket connections can handle higher throughput. Compliance observers should never send messages.

Permissions

await kit.attach_channel("room-1", "ws-user", access=Access.READ_WRITE)
await kit.attach_channel("room-1", "ws-monitor", access=Access.READ_ONLY)
await kit.attach_channel("room-1", "ws-audit", access=Access.WRITE_ONLY)

Rate Limiting

Per-channel rate limits use a token bucket algorithm. When exceeded, delivery is queued, not dropped:

await kit.attach_channel("room-1", "sms-main", rate_limit=RateLimit(max_per_second=2.0))
await kit.attach_channel("room-1", "ws-user", rate_limit=RateLimit(max_per_second=20.0))

Channel-Specific Policies

Hooks can target specific channel types, so you can enforce carrier content policies on SMS without penalizing internal WebSocket channels:

@kit.hook(
    HookTrigger.BEFORE_BROADCAST,
    name="sms_strict_filter",
    channel_types={ChannelType.SMS, ChannelType.RCS},
    priority=0,
)
async def sms_strict_filter(event: RoomEvent, ctx: RoomContext) -> HookResult:
    # Enforce carrier-specific content rules
    ...

Circuit Breakers

RoomKit's EventRouter maintains per-channel circuit breakers automatically. When a channel accumulates consecutive delivery failures, the breaker opens and subsequent deliveries fail fast, protecting healthy channels from cascading failures.

Layer 6: Voice Guardrails

Voice AI adds unique challenges. Users can interrupt mid-sentence, background noise gets transcribed as gibberish, and TTS engines read markdown formatting aloud.

Interruption Control

config = InterruptionConfig(
    strategy=InterruptionStrategy.CONFIRMED,
    min_speech_ms=300,
    allow_during_first_ms=2000,
)

Four strategies cover the spectrum: IMMEDIATE for responsive conversations, CONFIRMED to avoid false triggers, SEMANTIC for backchannel detection ("uh-huh" vs. real interruptions), and DISABLED for safety-critical messages that must be heard in full.

Transcript Filtering

The ON_TRANSCRIPTION hook filters noise before it reaches the AI:

@kit.hook(HookTrigger.ON_TRANSCRIPTION, name="transcript_filter")
async def transcript_filter(event: TranscriptionEvent, ctx: RoomContext) -> HookResult:
    text = event.text.strip()
    if len(text) < 3:
        return HookResult.block(reason="Utterance too short")
    fillers = {"um", "uh", "hmm", "ah"}
    if set(text.lower().split()) <= fillers:
        return HookResult.block(reason="Filler speech only")
    return HookResult.allow()

Pre-TTS Sanitization

Strip markdown formatting before it becomes spoken words:

@kit.hook(HookTrigger.BEFORE_TTS, name="tts_sanitizer")
async def tts_sanitizer(event: str, ctx: RoomContext) -> HookResult:
    text = re.sub(r"\*\*(.+?)\*\*", r"\1", event)   # **bold**
    text = re.sub(r"\[(.+?)\]\(.+?\)", r"\1", text)  # [link](url)
    if text != event:
        return HookResult.modify(text)
    return HookResult.allow()

Layer 7: Audit

Every guardrail decision should be logged. RoomKit's async hooks are fire-and-forget and never block the pipeline:

@kit.hook(HookTrigger.AFTER_BROADCAST, execution=HookExecution.ASYNC, name="audit")
async def audit(event: RoomEvent, ctx: RoomContext) -> None:
    logger.info("Event %s in room %s from %s", event.id, ctx.room.id, event.source)

For regulated industries (our primary use case at TchatNSign is financial advisory under AMF/CIRO compliance), this audit trail isn't optional. It's the whole point.

Composing It All Together

Here's what a production guardrail stack looks like:

kit = RoomKit(max_chain_depth=3)

# Priority 0: Block toxic content
@kit.hook(HookTrigger.BEFORE_BROADCAST, name="toxicity_filter", priority=0)
async def toxicity_filter(event, ctx): ...

# Priority 1: Redact PII
@kit.hook(HookTrigger.BEFORE_BROADCAST, name="pii_redactor", priority=1)
async def pii_redactor(event, ctx): ...

# Priority 10: Filter AI output
@kit.hook(HookTrigger.BEFORE_BROADCAST, name="output_guard", priority=10)
async def output_guard(event, ctx): ...

# Async: Audit everything
@kit.hook(HookTrigger.AFTER_BROADCAST, execution=HookExecution.ASYNC, name="audit")
async def audit(event, ctx): ...

# Tool policy
policy = ToolPolicy(
    allow=["get_weather", "search_*"],
    deny=["delete_*", "admin_*"],
)

# AI channel with processing limits
ai = AIChannel(
    "ai-assistant",
    provider=provider,
    tool_policy=policy,
    max_tool_rounds=20,
    tool_loop_timeout_seconds=30.0,
    max_tokens=1024,
)

# Per-channel rate limits
await kit.attach_channel("room-1", "ws-user", rate_limit=RateLimit(max_per_second=5.0))

Each layer is independent. Add a jailbreak detector without touching PII redaction. Swap the moderation API without changing tool policies. Remove rate limits on internal channels without affecting SMS constraints.

Hook Reference

Hook Sync/Async Can Block/Modify Use Case
BEFORE_BROADCAST Sync Yes Input/output filtering, PII, jailbreak
ON_TOOL_CALL Sync Yes Tool auditing, conditional blocking
ON_TRANSCRIPTION Sync Yes Transcript filtering
BEFORE_TTS Sync Yes TTS text sanitization
ON_AI_RESPONSE Async No Monitoring, latency tracking
BEFORE_DELIVER Async No Delivery observation
AFTER_BROADCAST Async No Audit, analytics, compliance

The Philosophy

RoomKit's guardrail system reflects a core belief: safety is not a feature, it's an architecture. You don't add guardrails to a working system. You design a system where guardrails are the natural way messages flow.

Every hook is a function. Every policy is data. Every limit is a number. Composable, testable, auditable. No magic, no hidden state, no "trust me, it's safe."

That's the kind of safety you can actually ship to production.


RoomKit is an open-source Python framework for multi-channel conversation orchestration. Check it out on GitHub or read the full documentation.