diff --git a/src/soundevent/audio/io.py b/src/soundevent/audio/io.py index f56cc6a..43ade63 100644 --- a/src/soundevent/audio/io.py +++ b/src/soundevent/audio/io.py @@ -4,12 +4,15 @@ """ import os -from typing import Optional, Tuple +from typing import Dict, Optional, Tuple import numpy as np +import soundfile as sf import xarray as xr -from scipy.io import wavfile +from soundevent.audio.chunks import parse_into_chunks +from soundevent.audio.media_info import extract_media_info_from_chunks +from soundevent.audio.raw import RawData from soundevent.data.clips import Clip from soundevent.data.recordings import Recording @@ -18,6 +21,17 @@ "load_recording", ] +PCM_SUBFORMATS_MAPPING: Dict[Tuple[int, int], str] = { + (1, 16): "PCM_16", + (1, 24): "PCM_24", + (1, 32): "PCM_32", + (1, 8): "PCM_U8", + (3, 32): "FLOAT", + (3, 64): "DOUBLE", + (6, 8): "ALAW", + (7, 8): "ULAW", +} + def load_audio( path: os.PathLike, @@ -42,30 +56,46 @@ def load_audio( samplerate : int The sample rate of the audio file in Hz. - """ - if offset == 0 and samples is None: - samplerate, data = wavfile.read(path, mmap=False) - else: - samplerate, mmap = wavfile.read(path, mmap=True) - - if samples is None: - end_index = None - else: - end_index = offset + samples - - data = mmap[offset:end_index] - # Add channel dimension if necessary - if data.ndim == 1: - data = data[:, None] - - # Convert to float if necessary - if data.dtype == "int16": - data = data.astype("float32") / np.iinfo("int16").max - if data.dtype == "int32": - data = data.astype("float32") / np.iinfo("int32").max - - return data, samplerate + """ + if samples is None: + samples = -1 + + with open(path, "rb") as fp: + chunks = parse_into_chunks(fp) + + # Extract the media information from the fmt chunk. + fmt = chunks.subchunks["fmt "] + media_info = extract_media_info_from_chunks(fp, fmt) + + # Get the subformat for the soundfile library to + # read the audio data. + subformat = PCM_SUBFORMATS_MAPPING.get( + (media_info.audio_format, media_info.bit_depth) + ) + if subformat is None: + raise ValueError( + f"Unsupported audio format: {media_info.audio_format} " + f"with bit depth {media_info.bit_depth}." + "Valid formats are: " + f"{PCM_SUBFORMATS_MAPPING.keys()}." + ) + + # Position the file pointer at the start of the data chunk. + data = chunks.subchunks["data"] + raw = RawData(fp, data) + + return sf.read( + raw, + start=offset, + frames=samples, + dtype="float32", + always_2d=True, + format="RAW", + subtype=subformat, + samplerate=media_info.samplerate, + channels=media_info.channels, + ) def load_recording(recording: Recording) -> xr.DataArray: diff --git a/src/soundevent/audio/media_info.py b/src/soundevent/audio/media_info.py index 53e3df8..bc1cf28 100644 --- a/src/soundevent/audio/media_info.py +++ b/src/soundevent/audio/media_info.py @@ -2,9 +2,9 @@ import hashlib import os from dataclasses import dataclass -from typing import Union +from typing import IO, Union -from soundevent.audio.chunks import parse_into_chunks +from soundevent.audio.chunks import Chunk, parse_into_chunks __all__ = [ "MediaInfo", @@ -16,6 +16,36 @@ PathLike = Union[os.PathLike, str] +@dataclass +class FormatInfo: + """Information stored in the format chunk.""" + + audio_format: int + """Format code for the waveform audio data.""" + + bit_depth: int + """Bit depth.""" + + samplerate: int + """Sample rate in Hz.""" + + channels: int + """Number of channels.""" + + byte_rate: int + """Byte rate. + + byte_rate = samplerate * channels * bit_depth/8 + """ + + block_align: int + """Block align. + + The number of bytes for one sample including all channels. + block_align = channels * bit_depth/8 + """ + + @dataclass class MediaInfo: """Media information.""" @@ -39,6 +69,50 @@ class MediaInfo: """Number of channels.""" +def extract_media_info_from_chunks( + fp: IO[bytes], + fmt_chunk: Chunk, +) -> FormatInfo: + """Return the media information from the fmt chunk. + + Parameters + ---------- + fp : BytesIO + File pointer to the WAV file. + + chunk : Chunk + The fmt chunk. + + Returns + ------- + MediaInfo + + Notes + ----- + The structure of the format chunk is described in + (WAV PCM soundfile format)[http://soundfile.sapp.org/doc/WaveFormat/]. + """ + # Go to the start of the fmt chunk after the chunk id and + # chunk size. + fp.seek(fmt_chunk.position + 8) + + audio_format = int.from_bytes(fp.read(2), "little") + channels = int.from_bytes(fp.read(2), "little") + samplerate = int.from_bytes(fp.read(4), "little") + byte_rate = int.from_bytes(fp.read(4), "little") + block_align = int.from_bytes(fp.read(2), "little") + bit_depth = int.from_bytes(fp.read(2), "little") + + return FormatInfo( + audio_format=audio_format, + bit_depth=bit_depth, + samplerate=samplerate, + channels=channels, + byte_rate=byte_rate, + block_align=block_align, + ) + + def get_media_info(path: PathLike) -> MediaInfo: """Return the media information from the WAV file. @@ -65,37 +139,27 @@ def get_media_info(path: PathLike) -> MediaInfo: with open(path, "rb") as wav: chunk = parse_into_chunks(wav) - # Get info from the fmt chunk. The fmt chunk is the first - # subchunk of the root chunk. - fmt_chunk = chunk.subchunks["fmt "] - - # Go to the start of the fmt chunk after the chunk id and - # chunk size. - wav.seek(fmt_chunk.position + 8) - - audio_format = int.from_bytes(wav.read(2), "little") - channels = int.from_bytes(wav.read(2), "little") - samplerate = int.from_bytes(wav.read(4), "little") - wav.read(4) # Skip byte rate. - wav.read(2) # Skip block align. - bit_depth = int.from_bytes(wav.read(2), "little") + # Get info from the fmt chunk + fmt = chunk.subchunks["fmt "] + fmt_info = extract_media_info_from_chunks(wav, fmt) # Get size of data chunk. Notice that the size of the data # chunk is the size of the data subchunk divided by the number # of channels and the bit depth. data_chunk = chunk.subchunks["data"] - samples = 8 * data_chunk.size // (channels * bit_depth) - - duration = samples / samplerate - - return MediaInfo( - audio_format=audio_format, - bit_depth=audio_format, - samplerate_hz=samplerate, - channels=channels, - samples=samples, - duration_s=duration, - ) + samples = ( + 8 * data_chunk.size // (fmt_info.channels * fmt_info.bit_depth) + ) + duration = samples / fmt_info.samplerate + + return MediaInfo( + audio_format=fmt_info.audio_format, + bit_depth=fmt_info.bit_depth, + samplerate_hz=fmt_info.samplerate, + duration_s=duration, + samples=samples, + channels=fmt_info.channels, + ) BUFFER_SIZE = 65536 diff --git a/src/soundevent/audio/raw.py b/src/soundevent/audio/raw.py new file mode 100644 index 0000000..0895195 --- /dev/null +++ b/src/soundevent/audio/raw.py @@ -0,0 +1,136 @@ +"""Raw Audio module. + +This module contains the RawData class which is a +file-like object that wraps the data buffer of a +WAV file and is meant to replicate the structure +of a RAW audio file. + +A RAW audio file is a file that contains only the +contents of the data chunk of a WAV file without +any of the other chunks. + +Handling RAW audio files is useful as WAV files +can come with various chunks that are not standard, +such as the Guano metadata chunk. This unexpected +chunks can sometimes cause problems when reading +the WAV file with other libraries and so it is +useful to be able to read only the data chunk of +a WAV file. +""" + +import os +from io import BufferedIOBase, RawIOBase +from typing import Optional + +from soundevent.audio.chunks import Chunk + + +class RawData(RawIOBase): + """A file-like object that wraps a the data buffer of a WAV file. + + This file-like object only contains the data buffer of a WAV without any + of the other chunks. + """ + + chunk: Chunk + """The chunk that is being read.""" + + initial_position: int + """The initial position of the file pointer. + + Should point to the start of the data chunk. + """ + + fp: BufferedIOBase + """The file pointer to the WAV file.""" + + size: int + """The size of the data chunk in bytes.""" + + def __init__(self, fp: BufferedIOBase, chunk: Chunk): + """Initialize a new RawData object.""" + self.chunk = chunk + self.fp = fp + self.size = chunk.size + + # Position the file pointer at the start of the data chunk. + # We add 8 to the position to account for the chunk id and + # chunk size. + self.initial_position = chunk.position + 8 + + # Position the file pointer at the start of the data chunk. + self.fp.seek(self.initial_position) + + assert self.fp.tell() == self.initial_position + + def close(self) -> None: + """Close the file.""" + self.fp.close() + + @property + def closed(self) -> bool: + """Return True if the file is closed.""" + return self.fp.closed + + def fileno(self) -> int: + """Return the file descriptor.""" + return self.fp.fileno() + + def flush(self) -> None: + """Flush the file.""" + + def isatty(self) -> bool: + """Return True if the file is a tty.""" + return False + + def readable(self) -> bool: + """Return True if the file is readable.""" + return True + + def seek(self, offset: int, whence: int = os.SEEK_SET, /) -> int: + """Seek the file pointer.""" + if whence == os.SEEK_SET: + return self.fp.seek( + self.initial_position + offset, + os.SEEK_SET, + ) + + if whence == os.SEEK_END: + return self.fp.seek( + self.initial_position + self.size + offset, + os.SEEK_SET, + ) + + return self.fp.seek(offset, whence) + + def seekable(self) -> bool: + """Return True if the file is seekable.""" + return True + + def tell(self) -> int: + """Return the file pointer position.""" + return self.fp.tell() - self.initial_position + + def truncate(self, size: Optional[int] = None, /) -> int: + """Truncate the file.""" + if size is None: + size = self.tell() + return self.fp.truncate(size) + + def writable(self) -> bool: + """Return True if the file is writable.""" + return False + + def read(self, size: int = -1, /) -> bytes: + """Read bytes from the file.""" + if size == -1: + size = self.size - self.tell() + return self.fp.read(size) + + def readall(self, /) -> bytes: + """Read all bytes from the file.""" + return self.fp.read(self.size - self.tell()) + + def readinto(self, b, /): + """Read bytes into a buffer.""" + return self.fp.readinto(b) diff --git a/tests/test_audio/24bitdepth.wav b/tests/test_audio/24bitdepth.wav new file mode 100644 index 0000000..69efce5 Binary files /dev/null and b/tests/test_audio/24bitdepth.wav differ diff --git a/tests/test_audio/test_audio.py b/tests/test_audio/test_audio.py index bf027b5..2058cb2 100644 --- a/tests/test_audio/test_audio.py +++ b/tests/test_audio/test_audio.py @@ -1,8 +1,8 @@ """Test suite for audio loading functions.""" from pathlib import Path +from uuid import uuid4 import numpy as np -from uuid import uuid4 import pytest import xarray as xr from hypothesis import HealthCheck, given, settings @@ -148,3 +148,24 @@ def test_read_clip( clip_wav.data, rec_xr.sel(time=clip_wav.time, method="nearest").data, ) + + +def test_can_load_clip_from_24_bit_depth_wav(): + """Test loading a 24 bit depth wav file.""" + # Arrange + path = Path(__file__).parent / "24bitdepth.wav" + + recording = data.Recording.from_file(path) + start_time = 0.5 + end_time = 1 + duration = end_time - start_time + clip = data.Clip( + recording=recording, + start_time=start_time, + end_time=end_time, + ) + + # Act + wav = load_clip(clip) + + assert wav.shape == (recording.samplerate * duration, recording.channels) diff --git a/tests/test_audio/test_raw.py b/tests/test_audio/test_raw.py new file mode 100644 index 0000000..c5af0a4 --- /dev/null +++ b/tests/test_audio/test_raw.py @@ -0,0 +1,65 @@ +"""Test suite for the RawData class.""" + +from pathlib import Path + +import pytest +import soundfile as sf +from hypothesis import HealthCheck, given, settings +from hypothesis import strategies as st + +from soundevent.audio.chunks import parse_into_chunks +from soundevent.audio.raw import RawData + + +def test_raw_data_chunk_has_correct_number_of_channels(random_wav): + """Test that the RawData chunk has the correct number of channels.""" + # Arrange + samplerate = 16_000 + duration = 1 + channels = 2 + path = random_wav( + samplerate=samplerate, + duration=duration, + channels=channels, + ) + + # Act + with open(path, "rb") as fp: + chunks = parse_into_chunks(fp) + data = RawData(fp, chunks.subchunks["data"]) + + with sf.SoundFile( + data, + samplerate=samplerate, + channels=channels, + subtype="FLOAT", + format="RAW", + ) as fp: + assert fp.channels == 2 + + +def test_raw_data_chunk_has_correct_size(random_wav): + """Test that the RawData chunk has the correct size.""" + # Arrange + samplerate = 16_000 + duration = 1 + channels = 2 + path = random_wav( + samplerate=samplerate, + duration=duration, + channels=channels, + ) + + # Act + with open(path, "rb") as fp: + chunks = parse_into_chunks(fp) + data = RawData(fp, chunks.subchunks["data"]) + + with sf.SoundFile( + data, + samplerate=samplerate, + channels=channels, + subtype="FLOAT", + format="RAW", + ) as fp: + assert len(fp) == int(samplerate * duration)