Skip to content

Delivery

Delivery Models

InboundMessage

Bases: BaseModel

A message received from an external provider.

For stateful channels (voice, persistent WebSocket), set session to the session object. After the hook pipeline passes, process_inbound will call channel.connect_session() to bind the long-lived session to the room.

InboundResult

Bases: BaseModel

Result of processing an inbound message.

DeliveryResult

Bases: BaseModel

Result of delivering an event to a channel.

ProviderResult

Bases: BaseModel

Result from a provider delivery attempt.

Delivery Strategies

DeliveryStrategy

Bases: ABC

Controls when and how content is delivered to a channel.

deliver abstractmethod async

deliver(ctx)

Deliver the content according to this strategy.

Immediate

Bases: DeliveryStrategy

Send now. May interrupt ongoing TTS playback.

WaitForIdle

WaitForIdle(buffer=1.0, playback_timeout=15.0)

Bases: DeliveryStrategy

Wait for TTS/speech to finish, then send.

Parameters:

Name Type Description Default
buffer float

Extra seconds to wait after playback ends (default 1.0).

1.0
playback_timeout float

Max seconds to wait for playback (default 15.0).

15.0

Queued

Queued(buffer=1.0, playback_timeout=15.0, separator='\n\n')

Bases: DeliveryStrategy

Add to queue, deliver at next idle window.

Multiple deliveries are batched into a single message.

Parameters:

Name Type Description Default
buffer float

Extra seconds to wait after playback ends (default 1.0).

1.0
playback_timeout float

Max seconds to wait for playback (default 15.0).

15.0
separator str

Text between batched items (default newline).

'\n\n'

Delivery Backend

DeliveryBackend

Bases: ABC

ABC for persistent delivery queue backends.

Implementations provide a durable queue with at-least-once semantics via the enqueue / dequeue / ack / nack lifecycle.

enqueue abstractmethod async

enqueue(item)

Add a delivery item to the queue.

dequeue abstractmethod async

dequeue(worker_id, batch_size=1, timeout=5.0)

Claim up to batch_size items. Blocks up to timeout seconds.

ack abstractmethod async

ack(item_id)

Acknowledge successful delivery — removes the item.

nack abstractmethod async

nack(item_id, error=None)

Negative-acknowledge. Re-enqueues or dead-letters the item.

dead_letter abstractmethod async

dead_letter(item_id, error)

Move an item to the dead-letter queue.

get_queue_depth abstractmethod async

get_queue_depth()

Return the number of pending items (observability).

get_dead_letter_items abstractmethod async

get_dead_letter_items(limit=50)

Return items in the dead-letter queue.

start async

start(kit)

Called by RoomKit on startup. Override to start a worker loop.

close async

close()

Called by RoomKit.close(). Override for graceful shutdown.

DeliveryItem

Bases: BaseModel

Serializable delivery request — the unit of work in the queue.

DeliveryItemStatus

Bases: StrEnum

Lifecycle status of a delivery item.

InMemoryDeliveryBackend

InMemoryDeliveryBackend(max_queue_size=1000, max_dead_letter_size=1000)

Bases: DeliveryBackend

Asyncio-queue-based delivery backend (single process, no persistence).

Items flow through _queue_in_flight → acked/dead-lettered. A background worker task drains the queue and executes deliveries.

start async

start(kit)

Start the background worker loop.

close async

close()

Stop the worker loop. In-flight items are re-enqueued.