Production Resilience¶
RoomKit provides built-in patterns for production deployments: circuit breakers, retry policies, rate limiting, delivery tracking, and room lifecycle timers.
Circuit Breaker¶
Prevents cascading failures by fast-failing requests to unhealthy providers:
from __future__ import annotations
from roomkit.models.delivery import CircuitBreaker
breaker = CircuitBreaker(
failure_threshold=5, # Open after 5 consecutive failures
recovery_timeout=30.0, # Try again after 30 seconds
half_open_max=2, # Allow 2 test requests in half-open state
)
States¶
CLOSED (normal)
│ failure_threshold consecutive failures
▼
OPEN (fast-fail)
│ recovery_timeout elapsed
▼
HALF_OPEN (testing)
├─ half_open_max successes → CLOSED
└─ any failure → OPEN
| State | Behavior |
|---|---|
| CLOSED | Normal operation. Failures increment the counter |
| OPEN | All requests fail immediately without calling the provider |
| HALF_OPEN | Limited test requests allowed. Success resets to CLOSED, failure reopens |
| Parameter | Default | Description |
|---|---|---|
failure_threshold |
5 |
Consecutive failures before opening |
recovery_timeout |
30.0 |
Seconds before trying again |
half_open_max |
2 |
Test requests allowed in half-open |
Retry Policy¶
Exponential backoff with configurable limits:
from __future__ import annotations
from roomkit.models.channel import RetryPolicy
policy = RetryPolicy(
max_retries=3,
base_delay_seconds=1.0,
exponential_base=2.0,
max_delay_seconds=30.0,
)
Delay formula: min(base_delay * exponential_base^attempt, max_delay)
| Attempt | Delay |
|---|---|
| 1 | 1.0s |
| 2 | 2.0s |
| 3 | 4.0s |
| 4 | 8.0s (capped at max_delay) |
Used by AIChannel for provider calls (see AI Steering guide for fallback chains) and the delivery layer for transport providers.
Tip
Only retryable errors (5xx, timeouts) trigger retries. Non-retryable errors (4xx) fail immediately.
Rate Limiting¶
Token bucket rate limiting for inbound messages:
from __future__ import annotations
from roomkit import RoomKit
from roomkit.channels import SMSChannel
from roomkit.core.rate_limit import RateLimit, TokenBucketRateLimiter
limiter = TokenBucketRateLimiter()
kit = RoomKit(rate_limiter=limiter)
sms = SMSChannel(
"sms",
provider=twilio_provider,
rate_limit=RateLimit(
requests_per_second=10,
burst_size=20,
),
)
| Parameter | Description |
|---|---|
requests_per_second |
Sustained rate limit |
burst_size |
Maximum burst above sustained rate |
The token bucket algorithm allows short bursts up to burst_size while enforcing the average rate over time. When the limit is exceeded, inbound messages are rejected before processing.
Delivery Status Tracking¶
Track message delivery through three modes:
from __future__ import annotations
from roomkit import HookTrigger, RoomKit
from roomkit.models.delivery import DeliveryMode
kit = RoomKit()
# Configure delivery mode per channel binding
await kit.attach_channel("room-1", "sms-main", metadata={
"delivery_mode": DeliveryMode.REQUIRE_DELIVERY,
})
| Mode | Behavior |
|---|---|
FIRE_AND_FORGET |
Send and don't wait for confirmation |
REQUIRE_DELIVERY |
Track delivery status, retry on failure |
REQUIRE_ACK |
Require acknowledgement from recipient |
Delivery Status Hook¶
@kit.hook(HookTrigger.ON_DELIVERY_STATUS)
async def on_delivery(event, ctx):
# event contains DeliveryStatus with provider_id, status, timestamp
if event.status == "failed":
logger.error(f"Delivery failed to {event.channel_id}: {event.error}")
elif event.status == "delivered":
logger.info(f"Delivered to {event.channel_id}")
Room Lifecycle Timers¶
Automatically manage room state with timers:
from __future__ import annotations
from roomkit import HookTrigger, RoomKit
from roomkit.models.room import RoomTimers
kit = RoomKit()
room = await kit.create_room(
room_id="support-123",
timers=RoomTimers(
idle_timeout_seconds=300, # Pause after 5 min idle
max_duration_seconds=3600, # Close after 1 hour
),
)
| Timer | Effect |
|---|---|
idle_timeout_seconds |
Pauses the room after N seconds without activity |
max_duration_seconds |
Closes the room after N seconds total |
Lifecycle Hooks¶
@kit.hook(HookTrigger.ON_ROOM_PAUSED)
async def on_paused(event, ctx):
logger.info(f"Room {event.room_id} paused due to inactivity")
# Notify participants, save state, etc.
@kit.hook(HookTrigger.ON_ROOM_CLOSED)
async def on_closed(event, ctx):
logger.info(f"Room {event.room_id} closed")
# Archive conversation, generate summary, etc.
Complete Production Setup¶
Combining all patterns for a resilient deployment:
from __future__ import annotations
from roomkit import RoomKit
from roomkit.channels import AIChannel, SMSChannel
from roomkit.core.rate_limit import RateLimit, TokenBucketRateLimiter
from roomkit.models.channel import RetryPolicy
from roomkit.providers.ai.anthropic import AnthropicAIProvider
from roomkit.providers.ai.openai import OpenAIAIProvider
from roomkit.store.postgres import PostgresStore
from roomkit.telemetry import OpenTelemetryProvider
# Production store + telemetry
store = PostgresStore("postgresql://user:pass@db/roomkit")
telemetry = OpenTelemetryProvider(service_name="roomkit-prod")
kit = RoomKit(
store=store,
rate_limiter=TokenBucketRateLimiter(),
telemetry=telemetry,
)
# AI with fallback and retries
primary = AnthropicAIProvider(model="claude-sonnet-4-20250514", api_key="...")
fallback = OpenAIAIProvider(model="gpt-4o", api_key="...")
ai = AIChannel(
"ai-assistant",
provider=primary,
fallback_provider=fallback,
retry_policy=RetryPolicy(max_retries=3, base_delay_seconds=1.0),
)
# SMS with rate limiting
sms = SMSChannel(
"sms-main",
provider=twilio_provider,
rate_limit=RateLimit(requests_per_second=10, burst_size=20),
)
kit.register_channel(ai)
kit.register_channel(sms)
# Room with lifecycle timers
room = await kit.create_room(
room_id="support-123",
timers=RoomTimers(idle_timeout_seconds=300, max_duration_seconds=3600),
)
Health Monitoring¶
Use telemetry spans and metrics to monitor resilience patterns:
from __future__ import annotations
from roomkit import HookExecution, HookTrigger
@kit.hook(HookTrigger.ON_ERROR, execution=HookExecution.ASYNC)
async def on_error(event, ctx):
logger.error(f"Error in {event.channel_id}: {event.error}")
# Emit metric to monitoring system
metrics.increment("roomkit.errors", tags={"channel": event.channel_id})
@kit.hook(HookTrigger.ON_DELIVERY_STATUS, execution=HookExecution.ASYNC)
async def track_delivery(event, ctx):
metrics.increment(
"roomkit.delivery",
tags={"status": event.status, "channel": event.channel_id},
)
The OpenTelemetryProvider automatically instruments all framework operations with spans — monitor via Jaeger, DataDog, or any OTEL-compatible backend. See the Telemetry guide for setup details.