Examples
Installation/Usage:
We recommend using uv as the python package manager.
How to install uv can be found at https://docs.astral.sh/uv/getting-started/installation/.
To install the dependencies, run the following command:
.. code-block:
$ uv sync
After completing the installation, one can now use the transcriber.
Simply start the server and the cli client: .. code-block:
# In two seperate terminal sessions
$ make local.server
$ make local.cli
One can also run a frontend app. We provide a simple streamlit app that shows the transcriptions. The CLI Client still has to be running in the background. We provide a Makefile task to run the server, frontend and cli client all at once: .. code-block:
# In another terminal session
$ make local.run-cli
Low Level Usage:
Running the server in a python script:
import asyncio
import signal
import sys
from whisper_web.server import TranscriptionServer
from whisper_web.whisper_model import ModelConfig
from app.helper import is_running_in_docker
# Automatically set HOST based on execution environment
HOST = "0.0.0.0" if is_running_in_docker() else "127.0.0.1"
PORT = 8000
def create_default_model_config() -> ModelConfig:
"""Create a default model configuration."""
return ModelConfig(
model_size="small", # Use small model as default for faster startup
device="cpu", # Use CPU for broader compatibility
continuous=True,
use_vad=False,
samplerate=16000,
)
async def monitor_sessions(server: TranscriptionServer, interval: int = 30):
"""Monitor sessions and clean up inactive ones periodically."""
while True:
try:
await asyncio.sleep(interval)
await server.cleanup_inactive_sessions()
active_sessions = len(server.client_sessions)
if active_sessions > 0:
print(f"Active sessions: {active_sessions}")
for session_id, session in server.client_sessions.items():
status = "running" if (session.inference_task and not session.inference_task.done()) else "stopped"
queue_size = session.manager.audio_queue.qsize()
transcription_count = len(session.manager.transcriptions)
print(f" {session_id}: {status}, queue={queue_size}, transcriptions={transcription_count}")
except Exception as e:
print(f"Session monitor error: {e}")
async def main():
"""Main server function."""
# Create server with default configuration
default_config = create_default_model_config()
server = TranscriptionServer(default_model_config=default_config, host=HOST, port=PORT)
print("Starting Multi-Client Transcription Server")
print("=" * 40)
print(f"Environment: {'Docker' if is_running_in_docker() else 'Local/User'}")
print(f"Host: {server.host}")
print(f"Port: {server.port}")
print(f"Default model: {default_config.model_size}")
print(f"Default device: {default_config.device}")
print()
# Start the FastAPI server in a thread
server.run()
# Wait a moment for server to start
await asyncio.sleep(2)
# Start session monitoring
monitor_task = asyncio.create_task(monitor_sessions(server))
try:
# Keep server running
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\nShutting down server...")
# Cancel monitoring
monitor_task.cancel()
# Cleanup all sessions
print("Cleaning up sessions...")
session_ids = list(server.client_sessions.keys())
for session_id in session_ids:
try:
await server.remove_session(session_id)
except Exception as e:
print(f"Error cleaning up session {session_id}: {e}")
print("Server stopped.")
def signal_handler(signum, frame):
"""Handle interrupt signals gracefully."""
print(f"\nReceived signal {signum}")
sys.exit(0)
if __name__ == "__main__":
# Setup signal handlers
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nServer stopped by user")
except Exception as e:
print(f"Server error: {e}")
sys.exit(1)
Note
The server will receive the audio data from the client and manages the transcription process.
Running a simple client in a python script:
import asyncio
import aiohttp
import websockets
import soundfile as sf
import io
from whisper_web.inputstream_generator import GeneratorConfig, InputStreamGenerator
from whisper_web.management import AudioManager
from whisper_web.events import EventBus
from app.helper import get_server_urls
API_BASE_URL, WS_BASE_URL = get_server_urls()
async def create_session_with_model(base_url: str, model_size: str = "small", counter: int = 0) -> None|str:
"""Create a new transcription session with a specific model configuration."""
failed_tries = counter
try:
async with aiohttp.ClientSession() as session:
# Create model config
model_config = {
"model_size": model_size,
"device": "cuda", # Use CPU for this example
"continuous": True,
"use_vad": False,
"samplerate": 16000,
}
# Create session
async with session.post(f"{base_url}/sessions", json=model_config) as response:
if response.status == 200:
data = await response.json()
session_id = data["session_id"]
print(f"Created session {session_id} with model {model_size}")
return session_id
else:
raise aiohttp.ClientResponseError(
request_info=response.request_info,
history=response.history,
status=response.status,
message=f"Failed to create session: {response.status}",
headers=response.headers
)
except Exception as e:
print(f"Error creating session: {e}")
await asyncio.sleep(1) # Wait a bit before retrying
return (
await create_session_with_model(base_url, model_size, counter + 1)
if failed_tries < 15
else None
)
async def list_sessions(base_url: str):
"""List all active sessions."""
async with aiohttp.ClientSession() as session:
async with session.get(f"{base_url}/sessions") as response:
if response.status == 200:
data = await response.json()
print(f"\nActive sessions ({data['total_sessions']}):")
for session_info in data["sessions"]:
print(f" Session ID: {session_info['session_id']}")
print(f" Model: {session_info['model_configuration']['model_size']}")
print(f" Inference running: {session_info['inference_running']}")
print(f" Transcriptions: {session_info['transcription_count']}")
print(f" Audio queue size: {session_info['audio_queue_size']}")
print()
else:
print(f"Failed to list sessions: {response.status}")
async def get_session_status(base_url: str, session_id: str):
"""Get detailed status for a specific session."""
async with aiohttp.ClientSession() as session:
async with session.get(f"{base_url}/sessions/{session_id}/status") as response:
if response.status == 200:
data = await response.json()
print(f"Session {session_id} status:")
print(f" Inference running: {data['inference_running']}")
print(f" Audio queue size: {data['audio_queue_size']}")
print(f" Model: {data['model_configuration']['model_size']}")
else:
print(f"Failed to get session status: {response.status}")
async def stream_audio(session_id: str, manager: AudioManager):
# Connect to WebSocket
uri = f"{WS_BASE_URL}/ws/transcribe/{session_id}"
# Connect without using context manager to keep connection alive
ws = await websockets.connect(uri)
try:
# Monitor and send audio chunks
while True:
# Get audio data
audio_chunk = await manager.get_next_audio_chunk()
if audio_chunk is None:
await asyncio.sleep(0.1)
continue # No audio chunk available, skip iteration
chunk, is_final = audio_chunk
print(f"Processing audio chunk: {chunk.data.shape}, final: {is_final}")
if chunk.data.numel() > 0:
# Convert to numpy array and ensure proper format
audio_data = chunk.data.detach().cpu().numpy()
# Convert to WAV bytes
with io.BytesIO() as buffer:
sf.write(buffer, audio_data, samplerate=16000, format="WAV")
wav_bytes = buffer.getvalue()
# Send to WebSocket with custom binary protocol
# First byte indicates if final (1) or not (0)
final_flag = b"\x01" if is_final else b"\x00"
message = final_flag + wav_bytes
await ws.send(message)
await asyncio.sleep(0.1) # Prevent busy waiting
except websockets.ConnectionClosedError as e:
print(f"WebSocket connection closed: {e}")
except Exception as e:
print(f"Error during audio streaming: {e}")
await ws.close()
async def get_transcriptions(base_url: str, session_id: str):
"""Get transcriptions for a specific session."""
async with aiohttp.ClientSession() as session:
async with session.get(f"{base_url}/sessions/{session_id}/transcriptions") as response:
if response.status == 200:
data = await response.json()
transcriptions = data["transcriptions"]
if transcriptions:
print(f"Transcriptions for session {session_id}:")
for i, transcription in enumerate(transcriptions):
print(f" {i + 1}: {transcription}")
else:
print(f"No transcriptions yet for session {session_id}")
else:
print(f"Failed to get transcriptions: {response.status}")
async def cleanup_session(base_url: str, session_id: str):
"""Clean up a session."""
async with aiohttp.ClientSession() as session:
async with session.delete(f"{base_url}/sessions/{session_id}") as response:
if response.status == 200:
data = await response.json()
print(f"Cleaned up session: {data['message']}")
else:
print(f"Failed to cleanup session: {response.status}")
async def main():
print("Multi-Client, Multi-Model Transcription Server Demo")
print("=" * 50)
# Wait a bit for server to be ready
await asyncio.sleep(1)
try:
# Create sessions with different model configurations
session_id = await create_session_with_model(API_BASE_URL, "small")
assert session_id is not None, "Failed to create session"
# List all sessions
await list_sessions(API_BASE_URL)
# Get status for each session
await get_session_status(API_BASE_URL, session_id)
# Create generator config and manager
event_bus = EventBus()
generator_config = GeneratorConfig()
generator_manager = AudioManager(event_bus)
generator = InputStreamGenerator(generator_config, event_bus)
print("InputStreamGenerator created")
print("AudioManager created")
print("Starting audio processing...")
audio_task = asyncio.create_task(generator.process_audio())
print("Audio processing task started")
stream_task = asyncio.create_task(stream_audio(session_id, generator_manager))
print("WebSocket streaming task started")
# Run the streaming function
await asyncio.gather(audio_task, stream_task, return_exceptions=True)
except Exception as e:
print(f"Demo error: {e}")
finally:
# Cleanup
try:
await cleanup_session(API_BASE_URL, session_id)
except Exception as e:
print(f"Failed to cleanup session: {e}")
if __name__ == "__main__":
print("Starting client...")
print("Make sure the TranscriptionServer is running on localhost:8000")
print("Press Ctrl+C to stop\n")
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nClient stopped by user")
except Exception as e:
print(f"CLient failed: {e}")
Note
The client will read the audio data from the input stream and send it to the server for transcription.
The input stream consists of a custom python module InputStreamGenerator that reads audio data from a file or a microphone.