1+ import numpy as np
2+ from abc import ABC , abstractmethod
3+ from collections import deque
4+ import sounddevice as sd
5+ from typing import Optional
6+
7+ class AudioDevice (ABC ):
8+ """
9+ Abstract Base Class for audio input devices used by ShadertoyChannelMusic.
10+ """
11+
12+ @abstractmethod
13+ def get_samples (self , num_samples : int ) -> np .ndarray :
14+ """
15+ Retrieve the most recent audio samples.
16+
17+ Args:
18+ num_samples (int): The number of samples required.
19+
20+ Returns:
21+ np.ndarray: A 1D numpy array of float32 audio samples (ideally [-1, 1]),
22+ of length `num_samples`. If fewer samples are available than
23+ requested, implementations should pad appropriately (e.g., with zeros).
24+ """
25+ pass
26+
27+ @abstractmethod
28+ def get_rate (self ) -> int :
29+ """
30+ Returns the sample rate of the audio device in Hz.
31+ """
32+ pass
33+
34+ def start (self ):
35+ """
36+ Start the audio device (e.g., open microphone stream).
37+ Optional: Base implementation does nothing.
38+ """
39+ pass
40+
41+ def stop (self ):
42+ """
43+ Stop the audio device (e.g., close microphone stream).
44+ Optional: Base implementation does nothing.
45+ """
46+ pass
47+
48+ def is_ready (self ) -> bool :
49+ """
50+ Check if the device is ready or has sufficient data.
51+ Optional: Base implementation assumes always ready.
52+ """
53+ return True
54+
55+ def gain (self ) -> float :
56+ """
57+ Returns the gain factor for the audio device.
58+ Optional: Base implementation returns 1.0 (no gain).
59+ """
60+ return 1.0
61+
62+ class FIFOPushAudioDevice (AudioDevice ):
63+ """
64+ An AudioDevice implementation using an internal FIFO buffer (deque).
65+ Samples are added using the `push_samples` method.
66+ """
67+ def __init__ (self , rate : int , max_buffer_samples : int , gain : float = 0.6 ):
68+ self ._gain = gain
69+ self ._rate = rate
70+ # Buffer stores slightly more than needed for FFT to handle requests
71+ self ._buffer = deque (maxlen = max_buffer_samples )
72+
73+ def get_rate (self ) -> int :
74+ return self ._rate
75+
76+ def push_samples (self , new_samples : np .ndarray ):
77+ """Appends new audio samples to the internal buffer."""
78+ # Ensure input is float32? Or handle conversion? Assume float for now.
79+ self ._buffer .extend (new_samples .astype (np .float32 ))
80+
81+ def get_samples (self , num_samples : int ) -> np .ndarray :
82+ """Returns the most recent `num_samples` from the buffer."""
83+ current_buffer = np .array (self ._buffer ) # Convert deque to numpy array for slicing
84+ available_samples = current_buffer .size
85+
86+ if available_samples >= num_samples :
87+ # Return the last num_samples
88+ return current_buffer [- num_samples :]
89+ else :
90+ # Not enough samples, return what we have padded with leading zeros
91+ padded_samples = np .zeros (num_samples , dtype = np .float32 )
92+ if available_samples > 0 :
93+ padded_samples [- available_samples :] = current_buffer
94+ return padded_samples
95+
96+ def is_ready (self ) -> bool :
97+ # Consider ready if buffer has at least enough samples for typical request?
98+ # Let's say ready if buffer has at least 512 samples (typical fft_input_size)
99+ return len (self ._buffer ) >= 512
100+
101+ def gain (self ) -> float :
102+ return self ._gain
103+
104+ class NullAudioDevice (AudioDevice ):
105+ """An AudioDevice that always returns silence."""
106+ def __init__ (self , rate : int = 44100 ):
107+ self ._rate = rate
108+
109+ def get_rate (self ) -> int :
110+ return self ._rate
111+
112+ def get_samples (self , num_samples : int ) -> np .ndarray :
113+ return np .zeros (num_samples , dtype = np .float32 )
114+
115+ class NoiseAudioDevice (AudioDevice ):
116+ """An AudioDevice that always returns noise."""
117+ def __init__ (self , rate : int = 44100 ):
118+ self ._rate = rate
119+
120+ def get_rate (self ) -> int :
121+ return self ._rate
122+
123+ def get_samples (self , num_samples : int ) -> np .ndarray :
124+ return np .random .uniform (- 1 , 1 , num_samples ).astype (np .float32 )
125+
126+ class MicrophoneAudioDevice (FIFOPushAudioDevice ):
127+ """
128+ An AudioDevice implementation that reads live audio from a system input device
129+ using the `sounddevice` library. Provides single-channel (mono) float32 samples.
130+ """
131+ def __init__ (self ,
132+ sample_rate : int = 44100 ,
133+ buffer_duration_seconds : float = 5.0 ,
134+ chunk_size : int = 1024 ,
135+ device_index : Optional [int ] = None ,
136+ gain : float = 0.6 ):
137+ """
138+ Initializes the MicrophoneAudioDevice.
139+
140+ Args:
141+ sample_rate (int): The desired sample rate in Hz.
142+ buffer_duration_seconds (float): The duration of the internal FIFO buffer
143+ in seconds. Determines the maximum amount
144+ of recent audio history stored.
145+ chunk_size (int): The number of samples to read from the audio device
146+ in each callback chunk. Affects latency and overhead.
147+ device_index (int | None): The index of the input audio device to use.
148+ If None, the default system input device is used.
149+ Use `sounddevice.query_devices()` to list devices.
150+ gain (float): The gain factor to apply to the audio fft data.
151+ """
152+ max_samples = int (sample_rate * buffer_duration_seconds )
153+ super ().__init__ (rate = sample_rate , max_buffer_samples = max_samples , gain = gain )
154+
155+ self ._chunk_size = chunk_size
156+ self ._device_index = device_index
157+ self ._stream = None
158+ # print(f"MicrophoneAudioDevice initialized. Rate: {sample_rate} Hz, "
159+ # f"Buffer: {buffer_duration_seconds}s ({max_samples} samples), "
160+ # f"Chunk Size: {chunk_size}, Device: {device_index or 'Default'}")
161+
162+ def _audio_callback (self , indata : np .ndarray , frames : int , time , status : sd .CallbackFlags ):
163+ """
164+ This function is called by sounddevice in a separate thread
165+ whenever new audio data is available.
166+ """
167+ if status :
168+ print (f"Warning: Sounddevice callback status: { status } " )
169+ if status .input_underflow : print (f"Warning: Input underflow detected!" )
170+ if status .input_overflow : print (f"Warning: Input overflow detected! Buffer might be too small or processing too slow." )
171+
172+ # indata comes in as float32 (due to dtype='float32' in stream)
173+ # It might be multi-channel, but we requested channels=1,
174+ # so it should have shape (frames, 1). We need 1D.
175+ if indata .shape [1 ] != 1 :
176+ # This shouldn't happen if channels=1 works, but as a fallback:
177+ mono_samples = indata .mean (axis = 1 ).astype (np .float32 )
178+ else :
179+ mono_samples = indata [:, 0 ] # Extract the single channel, makes it 1D
180+
181+ # Push the mono samples into the deque buffer (managed by parent class)
182+ # This uses self._buffer.extend(), which is thread-safe in CPython.
183+ self .push_samples (mono_samples )
184+
185+ def start (self ):
186+ """Starts the audio stream from the microphone."""
187+ if self ._stream is not None and self ._stream .active :
188+ print ("Warning: Stream already running. Call stop() first if you want to restart." )
189+ return
190+
191+ try :
192+ self ._stream = sd .InputStream (
193+ samplerate = self .get_rate (),
194+ blocksize = self ._chunk_size ,
195+ device = self ._device_index ,
196+ channels = 1 , # Request mono directly
197+ dtype = np .float32 , # Request 32-bit float
198+ callback = self ._audio_callback ,
199+ latency = 'low' # Or 'high' for potentially more stable streaming
200+ )
201+ self ._stream .start ()
202+ except Exception as e :
203+ print (f"Error: Failed to start audio stream: { e } " )
204+ self ._stream = None # Ensure stream is None if start failed
205+ # Re-raise the exception if the calling code needs to know
206+ raise e
207+
208+ def stop (self ):
209+ """Stops the audio stream."""
210+ if self ._stream is None :
211+ return
212+
213+ try :
214+ if self ._stream .active :
215+ self ._stream .stop ()
216+ self ._stream .close ()
217+ except Exception as e :
218+ print (f"Error: stopping audio stream: { e } " )
219+ finally :
220+ # Ensure stream is marked as stopped regardless of errors
221+ self ._stream = None
222+
223+ # Optional: Override is_ready to also check the stream status
224+ def is_ready (self ) -> bool :
225+ """Check if the stream is active and the buffer has sufficient data."""
226+ parent_ready = super ().is_ready ()
227+ stream_active = self ._stream is not None and self ._stream .active
228+ # TODO - allow getting samples
229+ # even if stream just stopped but buffer is full.
230+ return stream_active and parent_ready
0 commit comments