from typing import List, Tuple
from asyncio import Queue
import torch
import asyncio
from whisper_web.events import (
    EventBus,
    AudioChunkReceived,
    AudioChunkGenerated,
    AudioChunkNum,
    TranscriptionCompleted,
    TranscriptionUpdated,
)
from whisper_web.types import Transcription, AudioChunk
[docs]
class TranscriptionManager:
    """Event-driven transcription manager."""
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        # State
        self.transcriptions: List[Transcription] = []
        self.current_transcription: str = ""
        self.audio_queue: Queue[Tuple[torch.Tensor, bool]] = Queue(maxsize=128)
        self.processed_chunks: int = 0
        self.num_chunks: int = 0
        # Subscribe to events
        self.event_bus.subscribe(AudioChunkReceived, self._handle_audio_chunk)  # type: ignore
        self.event_bus.subscribe(TranscriptionCompleted, self._handle_transcription_completed)  # type: ignore
    async def _handle_audio_chunk(self, event: AudioChunkReceived) -> None:
        """Handle incoming audio chunks."""
        if event.chunk.data.numel() > 0:
            await self.audio_queue.put((event.chunk.data, event.is_final))
    async def _handle_transcription_completed(self, event: TranscriptionCompleted) -> None:
        """Handle completed transcriptions."""
        self.current_transcription = event.transcription.text
        self.processed_chunks += 1
        if len(self.transcriptions) < 1 or event.is_final:
            self.transcriptions.append(event.transcription)
        # Publish transcription update event
        await self.event_bus.publish(TranscriptionUpdated(current_text=self.current_transcription, full_text=self.full_transcription))
[docs]
    async def run_inference(self, model) -> None:
        """Main inference loop - no direct dependency on TranscriptionManager."""
        while True:
            try:
                audio_data: tuple[torch.Tensor, bool] = await asyncio.wait_for(self.audio_queue.get(), timeout=1.0)
            except asyncio.TimeoutError:
                continue
            await model(audio_data) 
    @property
    def queue_size(self) -> int:
        return self.audio_queue.qsize()
    @property
    def full_transcription(self) -> str:
        full_transcription = " ".join([transcription.text for transcription in self.transcriptions])
        return full_transcription + self.current_transcription
    @property
    def stats(self) -> dict:
        return {
            "queue_size": self.queue_size,
            "processed_chunks": self.processed_chunks,
            "num_transcriptions": len(self.transcriptions),
        }
[docs]
    def clear_audio_queue(self) -> None:
        while not self.audio_queue.empty():
            self.audio_queue.get_nowait() 
 
[docs]
class AudioManager:
    """Event-driven audio chunk manager."""
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        # State
        self.audio_chunk_queue: Queue[Tuple[AudioChunk, bool]] = Queue(maxsize=128)
        self.processed_chunks: int = 0
        self.num_chunks: int = 0
        # Subscribe to events
        self.event_bus.subscribe(AudioChunkGenerated, self._handle_generated_audio_chunk)  # type: ignore
        self.event_bus.subscribe(AudioChunkNum, self._handle_num_chunks_updated)  # type: ignore
    async def _handle_generated_audio_chunk(self, event: AudioChunkGenerated) -> None:
        """Handle incoming audio chunks."""
        try:
            audio_chunk = event.chunk
            is_final = event.is_final
            self.processed_chunks += 1
            if audio_chunk.data.numel() > 0:
                await self.audio_chunk_queue.put((audio_chunk, is_final))
        except Exception as e:
            print(f"Error handling audio chunk: {e}")
    async def _handle_num_chunks_updated(self, event: AudioChunkNum) -> None:
        """Handle completed transcriptions."""
        self.num_chunks = event.num_chunks
[docs]
    async def get_next_audio_chunk(self) -> Tuple[AudioChunk, bool] | None:
        """Retrieve the next available audio chunk from the queue."""
        try:
            audio_chunk, is_final = await asyncio.wait_for(self.audio_chunk_queue.get(), timeout=1.0)
            return audio_chunk, is_final
        except asyncio.TimeoutError:
            return None
        except Exception as e:
            print(f"Error getting audio chunk: {e}")
            return None 
    @property
    def queue_size(self) -> int:
        return self.audio_chunk_queue.qsize()
    @property
    def stats(self) -> dict:
        return {
            "queue_size": self.queue_size,
            "processed_chunks": self.processed_chunks,
        }
[docs]
    def clear_audio_queue(self) -> None:
        while not self.audio_chunk_queue.empty():
            self.audio_chunk_queue.get_nowait()