Skip to content

Audio Pipeline

Audio processing pipeline for voice channels. See the Audio Pipeline Stages guide for usage examples.

Pipeline

AudioPipeline

AudioPipeline(config, *, backend_capabilities=NONE, backend_feeds_aec_reference=False)

Orchestrates audio frame processing through pipeline stages.

Inbound processing order

[Resampler] -> [Recorder tap] -> [DTMF] -> [AEC] -> [AGC] -> [Denoiser] -> [VAD] -> [Diarization]

Outbound processing order

[PostProcessors] -> [Recorder tap] -> AEC.feed_reference -> [Resampler]

AEC and AGC stages are skipped when the backend declares NATIVE_AEC / NATIVE_AGC capabilities.

on_speech_end

on_speech_end(callback)

Register callback for when VAD detects speech end.

on_speech_frame

on_speech_frame(callback)

Register callback for processed audio frames during speech.

on_processed_frame

on_processed_frame(callback)

Register callback for every processed inbound frame.

Fires after all pipeline stages (AEC, denoiser, VAD, etc.) for every frame, regardless of speech state. Used by continuous STT streaming when no local VAD is configured.

on_vad_event

on_vad_event(callback)

Register callback for all VAD events.

on_speaker_change

on_speaker_change(callback)

Register callback for speaker change detection.

on_dtmf

on_dtmf(callback)

Register callback for DTMF tone detection.

on_recording_started

on_recording_started(callback)

Register callback for recording start.

on_recording_stopped

on_recording_stopped(callback)

Register callback for recording stop.

set_parent_span

set_parent_span(session_id, span_id)

Set the parent span (VOICE_SESSION) for pipeline spans.

process_frame

process_frame(session, frame)

Process a single inbound audio frame through the pipeline.

Backwards-compatible alias for process_inbound().

process_inbound

process_inbound(session, frame)

Process a single inbound audio frame through the pipeline.

[Resampler] -> [Recorder tap] -> [DTMF] -> [AEC] -> [AGC] ->

[Denoiser] -> [VAD] -> [Diarization]

process_outbound

process_outbound(session, frame)

Process a single outbound audio frame through the pipeline.

[PostProcessors] -> [Recorder tap] -> AEC.feed_reference ->

[Resampler]

enable_playback_aec_feed

enable_playback_aec_feed()

Mark that AEC reference is fed at playback time.

When called, process_outbound() skips its own aec.feed_reference() to avoid double-feeding with misaligned timing.

feed_aec_reference

feed_aec_reference(frame)

Feed an AEC reference frame directly (from speaker output).

Called by the backend's speaker callback at playback time so the AEC has time-aligned reference for echo cancellation.

Thread-safety: may be called from the audio I/O thread. Uses a separate resampler instance from process_outbound to avoid thread-safety issues.

on_session_active

on_session_active(session)

Called when a voice session becomes active.

Cleans up stale state for this session and starts recording if configured.

on_session_ended

on_session_ended(session)

Called when a voice session ends.

Stops recording and debug taps if active.

reset

reset()

Reset all pipeline stage state.

close

close()

Release all pipeline resources.

AudioPipelineConfig dataclass

AudioPipelineConfig(vad=None, denoiser=None, diarization=None, postprocessors=list(), vad_config=None, aec=None, agc=None, agc_config=None, dtmf=None, turn_detector=None, backchannel_detector=None, recorder=None, recording_config=None, interruption=None, resampler=None, contract=None, debug_taps=None, telemetry=None)

Configuration for the audio processing pipeline.

All stages are optional. At least one provider should be set for the pipeline to be useful.

Typical combinations:

  • VoiceChannel (STT path): vad (+ optional denoiser/diarization)
  • RealtimeVoiceChannel (speech-to-speech): denoiser and/or diarization (VAD not needed — the AI provider handles turn detection)

vad class-attribute instance-attribute

vad = None

Optional Voice Activity Detection provider.

denoiser class-attribute instance-attribute

denoiser = None

Optional denoiser applied before VAD.

diarization class-attribute instance-attribute

diarization = None

Optional speaker diarization applied after VAD.

postprocessors class-attribute instance-attribute

postprocessors = field(default_factory=list)

Optional postprocessors applied on the outbound path.

vad_config class-attribute instance-attribute

vad_config = None

Optional VAD-specific configuration override.

aec class-attribute instance-attribute

aec = None

Optional Acoustic Echo Cancellation provider.

agc class-attribute instance-attribute

agc = None

Optional Automatic Gain Control provider.

agc_config class-attribute instance-attribute

agc_config = None

Optional AGC-specific configuration override.

dtmf class-attribute instance-attribute

dtmf = None

Optional DTMF tone detector (runs in parallel with main chain).

turn_detector class-attribute instance-attribute

turn_detector = None

Optional post-STT turn completion detector.

backchannel_detector class-attribute instance-attribute

backchannel_detector = None

Optional backchannel detector for semantic interruption strategy.

recorder class-attribute instance-attribute

recorder = None

Optional audio recorder.

recording_config class-attribute instance-attribute

recording_config = None

Optional recording configuration.

interruption class-attribute instance-attribute

interruption = None

Optional interruption (barge-in) configuration.

resampler class-attribute instance-attribute

resampler = None

Optional resampler provider for format conversion.

contract class-attribute instance-attribute

contract = None

Optional input/output format contract.

debug_taps class-attribute instance-attribute

debug_taps = None

Optional diagnostic audio capture at pipeline stage boundaries.

telemetry class-attribute instance-attribute

telemetry = None

Optional telemetry provider for pipeline metrics.

AudioFrame dataclass

AudioFrame(data, sample_rate=16000, channels=1, sample_width=2, timestamp_ms=None, metadata=dict())

A single frame of inbound audio for pipeline processing.

AudioFrame flows through the audio pipeline stages: denoiser -> VAD -> diarization. Each stage may annotate the metadata dict with its results.

This is distinct from AudioChunk, which is used for outbound TTS audio streaming.

data instance-attribute

data

Raw audio bytes (PCM).

sample_rate class-attribute instance-attribute

sample_rate = 16000

Sample rate in Hz.

channels class-attribute instance-attribute

channels = 1

Number of audio channels.

sample_width class-attribute instance-attribute

sample_width = 2

Bytes per sample (2 = 16-bit PCM).

timestamp_ms class-attribute instance-attribute

timestamp_ms = None

Timestamp in milliseconds (relative to session start).

metadata class-attribute instance-attribute

metadata = field(default_factory=dict)

Pipeline stages annotate results here.

VAD (Voice Activity Detection)

VADProvider

Bases: ABC

Abstract base class for Voice Activity Detection providers.

name abstractmethod property

name

Provider name (e.g. 'silero', 'webrtc').

process abstractmethod

process(frame)

Process an audio frame and optionally return a VAD event.

Parameters:

Name Type Description Default
frame AudioFrame

The audio frame to analyse.

required

Returns:

Type Description
VADEvent | None

A VADEvent if a state transition occurred, else None.

reset

reset()

Reset internal state (e.g. between utterances).

close

close()

Release resources.

VADConfig dataclass

VADConfig(silence_threshold_ms=500, speech_pad_ms=300, min_speech_duration_ms=250, extra=dict())

Configuration for VAD processing.

silence_threshold_ms class-attribute instance-attribute

silence_threshold_ms = 500

Milliseconds of silence before triggering SPEECH_END.

speech_pad_ms class-attribute instance-attribute

speech_pad_ms = 300

Padding added around detected speech segments.

min_speech_duration_ms class-attribute instance-attribute

min_speech_duration_ms = 250

Minimum speech duration to trigger events.

extra class-attribute instance-attribute

extra = field(default_factory=dict)

Provider-specific configuration.

VADEvent dataclass

VADEvent(type, audio_bytes=None, confidence=None, duration_ms=None, level_db=None)

Event produced by a VAD provider.

type instance-attribute

type

The type of VAD event.

audio_bytes class-attribute instance-attribute

audio_bytes = None

Speech audio. Set on SPEECH_START (pre-roll buffer) and SPEECH_END (full accumulated speech including pre-roll).

confidence class-attribute instance-attribute

confidence = None

Confidence score (0.0 to 1.0).

duration_ms class-attribute instance-attribute

duration_ms = None

Duration in milliseconds (speech or silence).

level_db class-attribute instance-attribute

level_db = None

Audio level in dB (set on AUDIO_LEVEL).

VADEventType

Bases: StrEnum

Types of VAD events.

MockVADProvider

MockVADProvider(events=None)

Bases: VADProvider

Mock VAD provider that returns a preconfigured sequence of events.

Example

from roomkit.voice.pipeline.vad.base import VADEvent, VADEventType

events = [ VADEvent(type=VADEventType.SPEECH_START), None, VADEvent(type=VADEventType.SPEECH_END, audio_bytes=b"audio"), ] vad = MockVADProvider(events=events)

Denoiser

DenoiserProvider

Bases: ABC

Abstract base class for audio denoising providers.

name abstractmethod property

name

Provider name (e.g. 'rnnoise', 'deepfilter').

process abstractmethod

process(frame)

Denoise an audio frame.

Parameters:

Name Type Description Default
frame AudioFrame

The noisy audio frame.

required

Returns:

Type Description
AudioFrame

A new or modified AudioFrame with reduced noise.

reset

reset()

Reset internal state.

close

close()

Release resources.

MockDenoiserProvider

MockDenoiserProvider()

Bases: DenoiserProvider

Mock denoiser that passes frames through unchanged.

Tracks processed frames for test assertions.

Diarization

DiarizationProvider

Bases: ABC

Abstract base class for speaker diarization providers.

name abstractmethod property

name

Provider name (e.g. 'pyannote', 'resemblyzer').

process abstractmethod

process(frame)

Analyse an audio frame for speaker identity.

Parameters:

Name Type Description Default
frame AudioFrame

The audio frame to analyse.

required

Returns:

Type Description
DiarizationResult | None

A DiarizationResult if a speaker was identified, else None.

reset

reset()

Reset internal state.

close

close()

Release resources.

DiarizationResult dataclass

DiarizationResult(speaker_id, confidence, is_new_speaker)

Result from a diarization provider.

speaker_id instance-attribute

speaker_id

Identified speaker label (e.g. 'speaker_0').

confidence instance-attribute

confidence

Confidence score (0.0 to 1.0).

is_new_speaker instance-attribute

is_new_speaker

True if this is the first time this speaker has been seen.

MockDiarizationProvider

MockDiarizationProvider(results=None)

Bases: DiarizationProvider

Mock diarization provider that returns a preconfigured sequence of results.

Example

results = [ DiarizationResult(speaker_id="speaker_0", confidence=0.9, is_new_speaker=True), DiarizationResult(speaker_id="speaker_0", confidence=0.95, is_new_speaker=False), ] diarizer = MockDiarizationProvider(results=results)

TTS Stream Filters

TTSStreamFilter

Bases: ABC

Base class for TTS text filters.

Supports both streaming (chunk-by-chunk via :meth:feed/:meth:flush) and non-streaming (full text via :meth:__call__) usage.

Subclasses must implement :meth:feed, :meth:flush, and :meth:reset. The default :meth:__call__ delegates to feed+flush but subclasses may override it with a more efficient implementation (e.g. a single regex pass).

feed abstractmethod

feed(chunk)

Process one streaming token/chunk. Return cleaned text (may be empty).

flush abstractmethod

flush()

Flush any buffered text at end-of-stream. Return remaining cleaned text.

reset abstractmethod

reset()

Reset internal state for a new utterance.

__call__

__call__(text)

Non-streaming convenience: filter a complete text string.

StripBrackets

StripBrackets()

Bases: TTSStreamFilter

Strip all [...] bracketed content from TTS text.

A simpler variant that catches markers like [Respond in French], [laughs], [thinking], etc.

StripInternalTags

StripInternalTags()

Bases: TTSStreamFilter

Strip [internal]...[/internal] and [internal: ...] blocks.

Handles two formats that AI models commonly produce:

  • Paired tags: [internal]reasoning here[/internal] spoken text
  • Single bracket: [internal: reasoning here] spoken text

In streaming mode, buffers text when [internal is detected and discards everything up to the matching close. Text outside tags is passed through immediately.

In non-streaming mode (__call__), a single regex removes all tagged blocks.

Events & Callbacks

SpeakerChangeEvent dataclass

SpeakerChangeEvent(session, speaker_id, confidence, is_new_speaker, timestamp=_utcnow())

Speaker change detected by diarization.

This event is fired when the audio pipeline's diarization stage detects a different speaker than the previous frame.

session instance-attribute

session

The voice session where the change was detected.

speaker_id instance-attribute

speaker_id

The new speaker's identifier.

confidence instance-attribute

confidence

Confidence score for the speaker identification (0.0 to 1.0).

is_new_speaker instance-attribute

is_new_speaker

True if this speaker has not been seen before in this session.

timestamp class-attribute instance-attribute

timestamp = field(default_factory=_utcnow)

When the speaker change was detected.

BargeInCallback module-attribute

BargeInCallback = Callable[[VoiceSession], Any]

Callback for barge-in detection: (session).

AudioReceivedCallback module-attribute

AudioReceivedCallback = Callable[['VoiceSession', Any], Any]