Skip to content

Production Resilience

RoomKit provides built-in patterns for production deployments: circuit breakers, retry policies, rate limiting, delivery tracking, and room lifecycle timers.

Circuit Breaker

Prevents cascading failures by fast-failing requests to unhealthy providers:

from __future__ import annotations

from roomkit.models.delivery import CircuitBreaker

breaker = CircuitBreaker(
    failure_threshold=5,      # Open after 5 consecutive failures
    recovery_timeout=30.0,    # Try again after 30 seconds
    half_open_max=2,          # Allow 2 test requests in half-open state
)

States

CLOSED (normal)
  │ failure_threshold consecutive failures
OPEN (fast-fail)
  │ recovery_timeout elapsed
HALF_OPEN (testing)
  ├─ half_open_max successes → CLOSED
  └─ any failure → OPEN
State Behavior
CLOSED Normal operation. Failures increment the counter
OPEN All requests fail immediately without calling the provider
HALF_OPEN Limited test requests allowed. Success resets to CLOSED, failure reopens
Parameter Default Description
failure_threshold 5 Consecutive failures before opening
recovery_timeout 30.0 Seconds before trying again
half_open_max 2 Test requests allowed in half-open

Retry Policy

Exponential backoff with configurable limits:

from __future__ import annotations

from roomkit.models.channel import RetryPolicy

policy = RetryPolicy(
    max_retries=3,
    base_delay_seconds=1.0,
    exponential_base=2.0,
    max_delay_seconds=30.0,
)

Delay formula: min(base_delay * exponential_base^attempt, max_delay)

Attempt Delay
1 1.0s
2 2.0s
3 4.0s
4 8.0s (capped at max_delay)

Used by AIChannel for provider calls (see AI Steering guide for fallback chains) and the delivery layer for transport providers.

Tip

Only retryable errors (5xx, timeouts) trigger retries. Non-retryable errors (4xx) fail immediately.

Rate Limiting

Token bucket rate limiting for inbound messages:

from __future__ import annotations

from roomkit import RoomKit
from roomkit.channels import SMSChannel
from roomkit.core.rate_limit import RateLimit, TokenBucketRateLimiter

limiter = TokenBucketRateLimiter()
kit = RoomKit(rate_limiter=limiter)

sms = SMSChannel(
    "sms",
    provider=twilio_provider,
    rate_limit=RateLimit(
        requests_per_second=10,
        burst_size=20,
    ),
)
Parameter Description
requests_per_second Sustained rate limit
burst_size Maximum burst above sustained rate

The token bucket algorithm allows short bursts up to burst_size while enforcing the average rate over time. When the limit is exceeded, inbound messages are rejected before processing.

Delivery Status Tracking

Track message delivery through three modes:

from __future__ import annotations

from roomkit import HookTrigger, RoomKit
from roomkit.models.delivery import DeliveryMode

kit = RoomKit()

# Configure delivery mode per channel binding
await kit.attach_channel("room-1", "sms-main", metadata={
    "delivery_mode": DeliveryMode.REQUIRE_DELIVERY,
})
Mode Behavior
FIRE_AND_FORGET Send and don't wait for confirmation
REQUIRE_DELIVERY Track delivery status, retry on failure
REQUIRE_ACK Require acknowledgement from recipient

Delivery Status Hook

@kit.hook(HookTrigger.ON_DELIVERY_STATUS)
async def on_delivery(event, ctx):
    # event contains DeliveryStatus with provider_id, status, timestamp
    if event.status == "failed":
        logger.error(f"Delivery failed to {event.channel_id}: {event.error}")
    elif event.status == "delivered":
        logger.info(f"Delivered to {event.channel_id}")

Room Lifecycle Timers

Automatically manage room state with timers:

from __future__ import annotations

from roomkit import HookTrigger, RoomKit
from roomkit.models.room import RoomTimers

kit = RoomKit()

room = await kit.create_room(
    room_id="support-123",
    timers=RoomTimers(
        idle_timeout_seconds=300,     # Pause after 5 min idle
        max_duration_seconds=3600,    # Close after 1 hour
    ),
)
Timer Effect
idle_timeout_seconds Pauses the room after N seconds without activity
max_duration_seconds Closes the room after N seconds total

Lifecycle Hooks

@kit.hook(HookTrigger.ON_ROOM_PAUSED)
async def on_paused(event, ctx):
    logger.info(f"Room {event.room_id} paused due to inactivity")
    # Notify participants, save state, etc.

@kit.hook(HookTrigger.ON_ROOM_CLOSED)
async def on_closed(event, ctx):
    logger.info(f"Room {event.room_id} closed")
    # Archive conversation, generate summary, etc.

Complete Production Setup

Combining all patterns for a resilient deployment:

from __future__ import annotations

from roomkit import RoomKit
from roomkit.channels import AIChannel, SMSChannel
from roomkit.core.rate_limit import RateLimit, TokenBucketRateLimiter
from roomkit.models.channel import RetryPolicy
from roomkit.providers.ai.anthropic import AnthropicAIProvider
from roomkit.providers.ai.openai import OpenAIAIProvider
from roomkit.store.postgres import PostgresStore
from roomkit.telemetry import OpenTelemetryProvider

# Production store + telemetry
store = PostgresStore("postgresql://user:pass@db/roomkit")
telemetry = OpenTelemetryProvider(service_name="roomkit-prod")

kit = RoomKit(
    store=store,
    rate_limiter=TokenBucketRateLimiter(),
    telemetry=telemetry,
)

# AI with fallback and retries
primary = AnthropicAIProvider(model="claude-sonnet-4-20250514", api_key="...")
fallback = OpenAIAIProvider(model="gpt-4o", api_key="...")

ai = AIChannel(
    "ai-assistant",
    provider=primary,
    fallback_provider=fallback,
    retry_policy=RetryPolicy(max_retries=3, base_delay_seconds=1.0),
)

# SMS with rate limiting
sms = SMSChannel(
    "sms-main",
    provider=twilio_provider,
    rate_limit=RateLimit(requests_per_second=10, burst_size=20),
)

kit.register_channel(ai)
kit.register_channel(sms)

# Room with lifecycle timers
room = await kit.create_room(
    room_id="support-123",
    timers=RoomTimers(idle_timeout_seconds=300, max_duration_seconds=3600),
)

Health Monitoring

Use telemetry spans and metrics to monitor resilience patterns:

from __future__ import annotations

from roomkit import HookExecution, HookTrigger

@kit.hook(HookTrigger.ON_ERROR, execution=HookExecution.ASYNC)
async def on_error(event, ctx):
    logger.error(f"Error in {event.channel_id}: {event.error}")
    # Emit metric to monitoring system
    metrics.increment("roomkit.errors", tags={"channel": event.channel_id})

@kit.hook(HookTrigger.ON_DELIVERY_STATUS, execution=HookExecution.ASYNC)
async def track_delivery(event, ctx):
    metrics.increment(
        "roomkit.delivery",
        tags={"status": event.status, "channel": event.channel_id},
    )

The OpenTelemetryProvider automatically instruments all framework operations with spans — monitor via Jaeger, DataDog, or any OTEL-compatible backend. See the Telemetry guide for setup details.