Source code for whisper_web.inputstream_generator

import numpy as np
import asyncio
import sys
import torch

from pydantic import BaseModel, Field
from typing import AsyncGenerator
import soundfile as sf
import resampy
from datetime import datetime

from whisper_web.events import EventBus, AudioChunkGenerated, AudioChunkNum
from whisper_web.types import AudioChunk

try:
    import sounddevice as sd
except OSError as e:
    print(e)
    print("If `GLIBCXX_x.x.x' not found, try installing it with: conda install -c conda-forge libstdcxx-ng=12")
    sys.exit()


[docs] class GeneratorConfig(BaseModel): """Configuration model for controlling audio input generation behavior. This configuration class is used to define how audio should be captured, processed, and segmented before being sent to a speech recognition system. """ samplerate: int = Field( default=16000, description="The specified samplerate of the audio data.", ) blocksize: int = Field( default=6000, description="The size of each individual audio chunk.", ) max_length_s: int = Field( default=25, description="The maximum length of the audio data.", ) adjustment_time: int = Field( default=5, description="The adjustment_time for setting the silence threshold.", ) min_chunks: int = Field( default=3, description="The minimum number of chunks to be generated, before feeding it into the asr model.", ) phrase_delta: float = Field( default=1.0, description="The expected pause between two phrases in seconds.", ) continuous: bool = Field( default=True, description="Whether to generate audio data conituously or not.", ) from_file: str = Field( default="", description="The path to the audio file to be used for inference.", )
[docs] class InputStreamGenerator: """Handles real-time or file-based audio input for speech processing and transcription. This class manages the lifecycle of audio input—from capturing or loading audio data to detecting speech segments and dispatching them for transcription. It supports both live microphone streams and pre-recorded audio files, and includes configurable voice activity detection (VAD) heuristics and silence detection. Core Features: - **Real-Time Audio Input**: Captures audio using a microphone input stream. - **File-Based Input**: Reads and processes audio from a file if specified. - **Silence Threshold Calibration**: Dynamically computes the silence threshold based on environmental noise. - **Voice Activity Detection (VAD)**: Supports both model-based and heuristic-based VAD. - **Phrase Segmentation**: Aggregates audio buffers into speech phrases based on silence duration and loudness. - **Asynchronous Processing**: Fully asynchronous design suitable for non-blocking audio pipelines. :param generator_config: Configuration object with audio processing settings :type generator_config: :class:`GeneratorConfig` :param transcription_manager: Instance of the TranscriptionManager to handle transcription logic :type transcription_manager: :class:`TranscriptionManager` :ivar _samplerate: Sample rate for audio processing as :class:`int`. :ivar _blocksize: Size of each audio block as :class:`int`. :ivar _adjustment_time: Time in seconds for adjusting silence threshold as :class:`int`. :ivar _min_chunks: Minimum number of chunks to process as :class:`int`. :ivar _continuous: Flag for continuous processing as :class:`bool`. :ivar _use_vad_model: Flag for using VAD model as :class:`bool`. :ivar _use_vad_heuristic: Flag for using VAD heuristic as :class:`bool`. :ivar _global_ndarray: Global buffer for audio data as :class:`np.ndarray`. :ivar _phrase_delta_blocks: Max number of blocks for inbetween phrases as :class:`int`. :ivar _silence_threshold: Threshold for silence detection as :class:`float`. :ivar _max_blocksize: Maximum size of audio block as :class:`int`. :ivar _max_chunks: Maximum number of chunks as :class:`int`. :ivar _generator_manager: Instance of the :class:`GeneratorManager` to handle audio chunks and status. :ivar _from_file: Path to the audio file if specified as :class:`str`. .. note:: Instantiate this class with a `GeneratorConfig` and `TranscriptionManager`, then call `process_audio()` to start listening or processing input. """ def __init__(self, generator_config: GeneratorConfig, event_bus: EventBus): self.samplerate = generator_config.samplerate self.blocksize = generator_config.blocksize self.adjustment_time = generator_config.adjustment_time self.min_chunks = generator_config.min_chunks self.continuous = generator_config.continuous self.event_bus = event_bus self.global_ndarray: np.ndarray = np.array([]) self.phrase_delta_blocks: int = int((self.samplerate // self.blocksize) * generator_config.phrase_delta) self.silence_threshold = -1 self.max_blocksize = generator_config.max_length_s * self.samplerate self.max_chunks = generator_config.max_length_s * self.samplerate / self.blocksize self.from_file = generator_config.from_file
[docs] async def process_audio(self) -> None: """Entry point for audio processing based on the selected VAD configuration. Determines which voice activity detection (VAD) strategy to use. .. note:: - If VAD heuristic is enabled: processes and filters audio based on conditional silence detection. - Else: buffers full audio input and passes it to the TranscriptionManager. """ if self.from_file: await self.generate_from_file(self.from_file) else: await self.set_silence_threshold() await self.process_with_heuristic()
[docs] async def generate(self) -> AsyncGenerator: """Asynchronously generates audio chunks for processing from a live input stream. This method acts as a unified audio generator, yielding blocks of audio data for downstream processing. Behavior: - Opens an audio input stream using `sounddevice.InputStream`. - Captures audio in blocks of `self.blocksize`, configured for mono 16-bit input. - Uses a thread-safe callback to push incoming audio data into an `asyncio.Queue`. - Yields `(in_data, status)` tuples from the queue as they become available. :return: A tuple containing the raw audio block and its status. :rtype: Iterator[Tuple[np.ndarray, CallbackFlags]] """ q_in = asyncio.Queue() loop = asyncio.get_event_loop() def callback(in_data, _, __, state): loop.call_soon_threadsafe(q_in.put_nowait, (in_data.copy(), state)) # Default stream args stream_args = dict( # type: ignore samplerate=self.samplerate, channels=1, dtype="int16", # type: ignore blocksize=self.blocksize, callback=callback, # type: ignore ) stream = sd.InputStream( **stream_args, ) with stream: while True: indata, status = await q_in.get() yield indata, status
[docs] async def generate_from_file(self, file_path: str) -> None: """Processes audio data from a file and simulates streaming for transcription. This method reads audio from the given file path, optionally resamples and converts it to mono, and then splits the audio into chunks that simulate live microphone input. Each chunk is passed to the transcription manager after waiting for the current transcription to complete. Workflow: 1. **File Loading**: - Reads audio from the specified file using `soundfile`. - Supports multi-channel audio, which is converted to mono by selecting the first channel. 2. **Resampling**: - If the audio files sample rate differs from the expected rate (`self.samplerate`), the data is resampled to match. 3. **Chunking**: - Audio is divided into blocks of `self.max_blocksize` samples. - The final chunk is zero-padded if it is shorter than the expected size. 4. **Transcription Dispatch**: - Each chunk is set as the current buffer and dispatched for transcription using `_send_audio()`. - Waits for the transcription manager's signal (`transcription_status.wait()`) before continuing. 5. **Timing Info**: - Logs the total time taken to process the file. :param file_path: Path to the audio file to be processed :type file_path: str """ data, samplerate = sf.read(file_path, dtype="float32") if np.ndim(data) > 1: data = np.mean(data, axis=1) # type: ignore # Resample if needed if samplerate != self.samplerate: data = resampy.resample(data.astype(np.float32), samplerate, self.samplerate) data = (data * 32767).astype(np.float32) # Mono conversion (just take first channel) if data.ndim > 1: data = data[:, 0] num_chunks = int(np.ceil(len(data) / self.max_blocksize)) print(f"Processing file {file_path} with {num_chunks} chunks of size {self.max_blocksize}") await self.event_bus.publish(AudioChunkNum(num_chunks=num_chunks)) # Yield chunks of blocksize for i in range(0, len(data), self.max_blocksize): chunk = data[i : i + self.max_blocksize] # type: ignore # Pad the last chunk if it's too short if len(chunk) < self.max_blocksize: chunk = np.pad(chunk, (0, self.max_blocksize - len(chunk)), "constant", constant_values=0) # type: ignore self.global_ndarray = chunk await self.send_audio(True)
[docs] async def process_with_heuristic(self) -> None: """Continuously processes audio input, detects significant speech segments, and dispatches them for transcription. This method operates in an asynchronous loop, consuming real-time audio buffers from `generate()`, aggregating meaningful speech segments while filtering out silence or noise based on a calculated silence threshold. Behavior: - **Silence Detection**: - Buffers with low average volume (below `self.silence_threshold`) are considered silent. - If `self.use_vad_heuristic` is enabled and several consecutive silent blocks are detected, the current speech phrase is considered complete and dispatched via `_send_audio()`. - **Buffer Aggregation**: - Incoming buffers are accumulated in `self.global_ndarray`. - If a buffer ends in silence and the aggregated data meets the minimum required chunks (`self.min_chunks`), the accumulated audio is dispatched. - **Modes**: - In continuous mode (`self.continuous` = True), the method loops indefinitely to process ongoing audio. - Otherwise, it exits after the first valid speech phrase is processed. - **File Mode**: - If processing from a file (`self.from_file` is not empty), silence threshold setup is skipped. """ empty_blocks = 0 async for indata, _ in self.generate(): indata_flattened = abs(indata.flatten()) silence = np.mean(indata_flattened) <= self.silence_threshold ending_silence = np.mean(indata_flattened[-500:-1]) <= self.silence_threshold # type: ignore starting_silence = np.mean(indata_flattened[:500]) <= self.silence_threshold # type: ignore print(f"Silence: {silence}, Starting Silence: {starting_silence}, Ending Silence: {ending_silence}") # Process the global ndarray if the max chunks are met if self.global_ndarray.size / self.blocksize == self.max_chunks: await self.send_audio(True) self.global_ndarray = np.array([]) # reset empty_blocks = 0 # continue if start/ending/whole of buffer is not silent if not starting_silence or not ending_silence or not silence: # concatenate buffers if self.global_ndarray.size > 0: self.global_ndarray = np.concatenate((self.global_ndarray, indata), dtype="int16") else: self.global_ndarray = indata # discard buffers that conain mostly silence if self.global_ndarray.size > 0 and silence: empty_blocks += 1 if empty_blocks >= self.phrase_delta_blocks: empty_blocks = 0 await self.send_audio(True) self.global_ndarray = np.array([]) # reset if not self.continuous: return continue empty_blocks = 0 await self.send_audio() if self.global_ndarray.size / self.blocksize >= self.min_chunks else None
[docs] async def process_raw_audio(self) -> None: """Continuously collects and dispatches raw audio chunks without any VAD filtering. This method is used when VAD heuristics is not enabled. It simply accumulates incoming audio buffers and dispatches them once a minimum number of chunks (`self.min_chunks`) has been collected. Behavior: - Audio is collected unfiltered from the input stream. - When the total buffered data reaches the defined threshold, it is sent for transcription. - In non-continuous mode (`self.continuous` = False), processing stops after the first valid dispatch. """ async for indata, _ in self.generate(): # concatenate buffers if self.global_ndarray.size > 0: self.global_ndarray = np.concatenate((self.global_ndarray, indata), dtype="int16") # type: ignore else: self.global_ndarray = indata # Process the global ndarray if the required chunks are met if self.global_ndarray.size / self.blocksize >= self.min_chunks: await self.send_audio() if not self.continuous: return
[docs] async def send_audio(self, is_final: bool = False) -> None: """Dispatches the collected audio buffer for transcription after normalization. This method converts the internal audio buffer (`self.global_ndarray`) from 16-bit PCM format to a normalized float32 waveform in the range [-1.0, 1.0]. It then assigns the waveform to the appropriate field in the `TranscriptionManager`, depending on whether a VAD model is being used. Behavior: - If `self.use_vad_model` is True, assigns the waveform to `transcription_manager.audio`. - Otherwise, assigns the waveform to `transcription_manager.clean_audio`. - Clears the internal audio buffer after dispatching. """ # Normalize int16 to float32 waveform in range [-1.0, 1.0] waveform = torch.from_numpy(self.global_ndarray.flatten().astype("float32") / 32768.0) audio_chunk = AudioChunk(data=waveform, timestamp=datetime.now()) # Publish the audio chunk event await self.event_bus.publish(AudioChunkGenerated(audio_chunk, is_final))
[docs] async def set_silence_threshold(self) -> None: """Dynamically determines and sets the silence threshold based on initial audio input. This method analyzes the average loudness of incoming audio blocks during a short calibration phase to determine an appropriate silence threshold. The threshold helps distinguish between background noise and meaningful speech during audio processing. How it works: 1. **Calibration Phase**: - Processes audio blocks for a predefined duration (`_adjustment_time` in seconds). - For each block, computes the mean absolute loudness and stores it. 2. **Threshold Calculation**: - After enough blocks are collected, calculates the average loudness across all blocks. - Sets `self.silence_threshold` to this value, treating it as the baseline for silence. .. note:: - This method is skipped if audio is being read from a file (`self.from_file` is set). - Intended to run once before audio processing begins, helping tailor silence detection to the environment. """ blocks_processed: int = 0 loudness_values: list = [] async for indata, _ in self.generate(): blocks_processed += 1 indata_flattened: np.ndarray = abs(indata.flatten()) # Compute loudness over first few seconds to adjust silence threshold loudness_values.append(np.mean(indata_flattened)) # type: ignore # Stop recording after ADJUSTMENT_TIME seconds if blocks_processed >= self.adjustment_time * (self.samplerate / self.blocksize): self.silence_threshold = float(np.mean(loudness_values)) # type: ignore print(f"Silence threshold set to {self.silence_threshold}") break