Source code for whisper_web.inputstream_generator
import numpy as np
import asyncio
import sys
import torch
from pydantic import BaseModel, Field
from typing import AsyncGenerator
import soundfile as sf
import resampy
from datetime import datetime
from whisper_web.events import EventBus, AudioChunkGenerated, AudioChunkNum
from whisper_web.types import AudioChunk
try:
import sounddevice as sd
except OSError as e:
print(e)
print("If `GLIBCXX_x.x.x' not found, try installing it with: conda install -c conda-forge libstdcxx-ng=12")
sys.exit()
[docs]
class GeneratorConfig(BaseModel):
"""Configuration model for controlling audio input generation behavior.
This configuration class is used to define how audio should be captured,
processed, and segmented before being sent to a speech recognition system.
"""
samplerate: int = Field(
default=16000,
description="The specified samplerate of the audio data.",
)
blocksize: int = Field(
default=6000,
description="The size of each individual audio chunk.",
)
max_length_s: int = Field(
default=25,
description="The maximum length of the audio data.",
)
adjustment_time: int = Field(
default=5,
description="The adjustment_time for setting the silence threshold.",
)
min_chunks: int = Field(
default=3,
description="The minimum number of chunks to be generated, before feeding it into the asr model.",
)
phrase_delta: float = Field(
default=1.0,
description="The expected pause between two phrases in seconds.",
)
continuous: bool = Field(
default=True,
description="Whether to generate audio data conituously or not.",
)
from_file: str = Field(
default="",
description="The path to the audio file to be used for inference.",
)
[docs]
class InputStreamGenerator:
"""Handles real-time or file-based audio input for speech processing and transcription.
This class manages the lifecycle of audio input—from capturing or loading audio data to detecting
speech segments and dispatching them for transcription. It supports both live microphone streams
and pre-recorded audio files, and includes configurable voice activity detection (VAD) heuristics
and silence detection.
Core Features:
- **Real-Time Audio Input**: Captures audio using a microphone input stream.
- **File-Based Input**: Reads and processes audio from a file if specified.
- **Silence Threshold Calibration**: Dynamically computes the silence threshold based on environmental noise.
- **Voice Activity Detection (VAD)**: Supports both model-based and heuristic-based VAD.
- **Phrase Segmentation**: Aggregates audio buffers into speech phrases based on silence duration and loudness.
- **Asynchronous Processing**: Fully asynchronous design suitable for non-blocking audio pipelines.
:param generator_config: Configuration object with audio processing settings
:type generator_config: :class:`GeneratorConfig`
:param transcription_manager: Instance of the TranscriptionManager to handle transcription logic
:type transcription_manager: :class:`TranscriptionManager`
:ivar _samplerate: Sample rate for audio processing as :class:`int`.
:ivar _blocksize: Size of each audio block as :class:`int`.
:ivar _adjustment_time: Time in seconds for adjusting silence threshold as :class:`int`.
:ivar _min_chunks: Minimum number of chunks to process as :class:`int`.
:ivar _continuous: Flag for continuous processing as :class:`bool`.
:ivar _use_vad_model: Flag for using VAD model as :class:`bool`.
:ivar _use_vad_heuristic: Flag for using VAD heuristic as :class:`bool`.
:ivar _global_ndarray: Global buffer for audio data as :class:`np.ndarray`.
:ivar _phrase_delta_blocks: Max number of blocks for inbetween phrases as :class:`int`.
:ivar _silence_threshold: Threshold for silence detection as :class:`float`.
:ivar _max_blocksize: Maximum size of audio block as :class:`int`.
:ivar _max_chunks: Maximum number of chunks as :class:`int`.
:ivar _generator_manager: Instance of the :class:`GeneratorManager` to handle audio chunks and status.
:ivar _from_file: Path to the audio file if specified as :class:`str`.
.. note::
Instantiate this class with a `GeneratorConfig` and `TranscriptionManager`, then call `process_audio()`
to start listening or processing input.
"""
def __init__(self, generator_config: GeneratorConfig, event_bus: EventBus):
self.samplerate = generator_config.samplerate
self.blocksize = generator_config.blocksize
self.adjustment_time = generator_config.adjustment_time
self.min_chunks = generator_config.min_chunks
self.continuous = generator_config.continuous
self.event_bus = event_bus
self.global_ndarray: np.ndarray = np.array([])
self.phrase_delta_blocks: int = int((self.samplerate // self.blocksize) * generator_config.phrase_delta)
self.silence_threshold = -1
self.max_blocksize = generator_config.max_length_s * self.samplerate
self.max_chunks = generator_config.max_length_s * self.samplerate / self.blocksize
self.from_file = generator_config.from_file
[docs]
async def process_audio(self) -> None:
"""Entry point for audio processing based on the selected VAD configuration.
Determines which voice activity detection (VAD) strategy to use.
.. note::
- If VAD heuristic is enabled: processes and filters audio based on conditional silence detection.
- Else: buffers full audio input and passes it to the TranscriptionManager.
"""
if self.from_file:
await self.generate_from_file(self.from_file)
else:
await self.set_silence_threshold()
await self.process_with_heuristic()
[docs]
async def generate(self) -> AsyncGenerator:
"""Asynchronously generates audio chunks for processing from a live input stream.
This method acts as a unified audio generator, yielding blocks of audio data for downstream processing.
Behavior:
- Opens an audio input stream using `sounddevice.InputStream`.
- Captures audio in blocks of `self.blocksize`, configured for mono 16-bit input.
- Uses a thread-safe callback to push incoming audio data into an `asyncio.Queue`.
- Yields `(in_data, status)` tuples from the queue as they become available.
:return: A tuple containing the raw audio block and its status.
:rtype: Iterator[Tuple[np.ndarray, CallbackFlags]]
"""
q_in = asyncio.Queue()
loop = asyncio.get_event_loop()
def callback(in_data, _, __, state):
loop.call_soon_threadsafe(q_in.put_nowait, (in_data.copy(), state))
# Default stream args
stream_args = dict( # type: ignore
samplerate=self.samplerate,
channels=1,
dtype="int16", # type: ignore
blocksize=self.blocksize,
callback=callback, # type: ignore
)
stream = sd.InputStream(
**stream_args,
)
with stream:
while True:
indata, status = await q_in.get()
yield indata, status
[docs]
async def generate_from_file(self, file_path: str) -> None:
"""Processes audio data from a file and simulates streaming for transcription.
This method reads audio from the given file path, optionally resamples and converts it to mono,
and then splits the audio into chunks that simulate live microphone input. Each chunk is passed
to the transcription manager after waiting for the current transcription to complete.
Workflow:
1. **File Loading**:
- Reads audio from the specified file using `soundfile`.
- Supports multi-channel audio, which is converted to mono by selecting the first channel.
2. **Resampling**:
- If the audio files sample rate differs from the expected rate (`self.samplerate`),
the data is resampled to match.
3. **Chunking**:
- Audio is divided into blocks of `self.max_blocksize` samples.
- The final chunk is zero-padded if it is shorter than the expected size.
4. **Transcription Dispatch**:
- Each chunk is set as the current buffer and dispatched for transcription using `_send_audio()`.
- Waits for the transcription manager's signal (`transcription_status.wait()`) before continuing.
5. **Timing Info**:
- Logs the total time taken to process the file.
:param file_path: Path to the audio file to be processed
:type file_path: str
"""
data, samplerate = sf.read(file_path, dtype="float32")
if np.ndim(data) > 1:
data = np.mean(data, axis=1) # type: ignore
# Resample if needed
if samplerate != self.samplerate:
data = resampy.resample(data.astype(np.float32), samplerate, self.samplerate)
data = (data * 32767).astype(np.float32)
# Mono conversion (just take first channel)
if data.ndim > 1:
data = data[:, 0]
num_chunks = int(np.ceil(len(data) / self.max_blocksize))
print(f"Processing file {file_path} with {num_chunks} chunks of size {self.max_blocksize}")
await self.event_bus.publish(AudioChunkNum(num_chunks=num_chunks))
# Yield chunks of blocksize
for i in range(0, len(data), self.max_blocksize):
chunk = data[i : i + self.max_blocksize] # type: ignore
# Pad the last chunk if it's too short
if len(chunk) < self.max_blocksize:
chunk = np.pad(chunk, (0, self.max_blocksize - len(chunk)), "constant", constant_values=0) # type: ignore
self.global_ndarray = chunk
await self.send_audio(True)
[docs]
async def process_with_heuristic(self) -> None:
"""Continuously processes audio input, detects significant speech segments, and dispatches them for transcription.
This method operates in an asynchronous loop, consuming real-time audio buffers from `generate()`, aggregating
meaningful speech segments while filtering out silence or noise based on a calculated silence threshold.
Behavior:
- **Silence Detection**:
- Buffers with low average volume (below `self.silence_threshold`) are considered silent.
- If `self.use_vad_heuristic` is enabled and several consecutive silent blocks are detected,
the current speech phrase is considered complete and dispatched via `_send_audio()`.
- **Buffer Aggregation**:
- Incoming buffers are accumulated in `self.global_ndarray`.
- If a buffer ends in silence and the aggregated data meets the minimum required chunks (`self.min_chunks`),
the accumulated audio is dispatched.
- **Modes**:
- In continuous mode (`self.continuous` = True), the method loops indefinitely to process ongoing audio.
- Otherwise, it exits after the first valid speech phrase is processed.
- **File Mode**:
- If processing from a file (`self.from_file` is not empty), silence threshold setup is skipped.
"""
empty_blocks = 0
async for indata, _ in self.generate():
indata_flattened = abs(indata.flatten())
silence = np.mean(indata_flattened) <= self.silence_threshold
ending_silence = np.mean(indata_flattened[-500:-1]) <= self.silence_threshold # type: ignore
starting_silence = np.mean(indata_flattened[:500]) <= self.silence_threshold # type: ignore
print(f"Silence: {silence}, Starting Silence: {starting_silence}, Ending Silence: {ending_silence}")
# Process the global ndarray if the max chunks are met
if self.global_ndarray.size / self.blocksize == self.max_chunks:
await self.send_audio(True)
self.global_ndarray = np.array([]) # reset
empty_blocks = 0
# continue if start/ending/whole of buffer is not silent
if not starting_silence or not ending_silence or not silence:
# concatenate buffers
if self.global_ndarray.size > 0:
self.global_ndarray = np.concatenate((self.global_ndarray, indata), dtype="int16")
else:
self.global_ndarray = indata
# discard buffers that conain mostly silence
if self.global_ndarray.size > 0 and silence:
empty_blocks += 1
if empty_blocks >= self.phrase_delta_blocks:
empty_blocks = 0
await self.send_audio(True)
self.global_ndarray = np.array([]) # reset
if not self.continuous:
return
continue
empty_blocks = 0
await self.send_audio() if self.global_ndarray.size / self.blocksize >= self.min_chunks else None
[docs]
async def process_raw_audio(self) -> None:
"""Continuously collects and dispatches raw audio chunks without any VAD filtering.
This method is used when VAD heuristics is not enabled.
It simply accumulates incoming audio buffers and dispatches them once a minimum
number of chunks (`self.min_chunks`) has been collected.
Behavior:
- Audio is collected unfiltered from the input stream.
- When the total buffered data reaches the defined threshold, it is sent for transcription.
- In non-continuous mode (`self.continuous` = False), processing stops after the first valid dispatch.
"""
async for indata, _ in self.generate():
# concatenate buffers
if self.global_ndarray.size > 0:
self.global_ndarray = np.concatenate((self.global_ndarray, indata), dtype="int16") # type: ignore
else:
self.global_ndarray = indata
# Process the global ndarray if the required chunks are met
if self.global_ndarray.size / self.blocksize >= self.min_chunks:
await self.send_audio()
if not self.continuous:
return
[docs]
async def send_audio(self, is_final: bool = False) -> None:
"""Dispatches the collected audio buffer for transcription after normalization.
This method converts the internal audio buffer (`self.global_ndarray`) from
16-bit PCM format to a normalized float32 waveform in the range [-1.0, 1.0].
It then assigns the waveform to the appropriate field in the `TranscriptionManager`,
depending on whether a VAD model is being used.
Behavior:
- If `self.use_vad_model` is True, assigns the waveform to `transcription_manager.audio`.
- Otherwise, assigns the waveform to `transcription_manager.clean_audio`.
- Clears the internal audio buffer after dispatching.
"""
# Normalize int16 to float32 waveform in range [-1.0, 1.0]
waveform = torch.from_numpy(self.global_ndarray.flatten().astype("float32") / 32768.0)
audio_chunk = AudioChunk(data=waveform, timestamp=datetime.now())
# Publish the audio chunk event
await self.event_bus.publish(AudioChunkGenerated(audio_chunk, is_final))
[docs]
async def set_silence_threshold(self) -> None:
"""Dynamically determines and sets the silence threshold based on initial audio input.
This method analyzes the average loudness of incoming audio blocks during a short calibration phase
to determine an appropriate silence threshold. The threshold helps distinguish between background
noise and meaningful speech during audio processing.
How it works:
1. **Calibration Phase**:
- Processes audio blocks for a predefined duration (`_adjustment_time` in seconds).
- For each block, computes the mean absolute loudness and stores it.
2. **Threshold Calculation**:
- After enough blocks are collected, calculates the average loudness across all blocks.
- Sets `self.silence_threshold` to this value, treating it as the baseline for silence.
.. note::
- This method is skipped if audio is being read from a file (`self.from_file` is set).
- Intended to run once before audio processing begins, helping tailor silence detection to the environment.
"""
blocks_processed: int = 0
loudness_values: list = []
async for indata, _ in self.generate():
blocks_processed += 1
indata_flattened: np.ndarray = abs(indata.flatten())
# Compute loudness over first few seconds to adjust silence threshold
loudness_values.append(np.mean(indata_flattened)) # type: ignore
# Stop recording after ADJUSTMENT_TIME seconds
if blocks_processed >= self.adjustment_time * (self.samplerate / self.blocksize):
self.silence_threshold = float(np.mean(loudness_values)) # type: ignore
print(f"Silence threshold set to {self.silence_threshold}")
break