Source code for pyquist.audio

"""Raw audio sample manipulation via Numpy-backed containers.

Everything in this module centers on :class:`Audio`, a thin wrapper around a
2D ``float32`` numpy array shaped ``(num_samples, num_channels)`` plus a
``sample_rate`` in Hz. By convention, sample values in ``[-1.0, 1.0]`` are
digital full-scale; values outside that range are valid in memory but clip on
playback or when written to most file formats.

Construct one from a numpy array, or load existing audio from disk or the web::

    import numpy as np
    import pyquist as pq

    sr = 44100
    t = np.arange(sr) / sr
    tone = pq.Audio(0.5 * np.sin(2 * np.pi * 440 * t), sample_rate=sr)  # 1s of A4

    riff = pq.Audio.from_file("guitar.wav")
    drums = pq.Audio.from_url("https://example.com/drums.mp3")

``Audio`` behaves like a numpy array where it can — it supports indexing,
slicing, ``len()``, and elementwise arithmetic (``+``, ``-``, ``*``, ``/``,
in-place variants), all returning ``Audio``::

    mix = riff + drums[: len(riff)]  # sum the overlapping region
    mix *= 0.5  # halve the amplitude in place

On top of that it offers music-specific helpers that return new ``Audio``
objects::

    clip = mix.as_mono().segment(offset=1.0, duration=3.0).resample(8000)
    clip.normalize(peak_dbfs=-1.0)
    clip.write("clip.wav")

See :meth:`Audio.zeros` for an empty destination buffer and
:meth:`Audio.concatenate` to join buffers end to end. To turn musical events
into ``Audio``, see :mod:`pyquist.score`.
"""

import pathlib
from io import BytesIO
from typing import IO, Optional, Union
from urllib.request import urlopen

import numpy as np
import soundfile as sf
import soxr

from .helper import db_to_amplitude


[docs] class Audio: """A wrapper around a 2D float32 numpy array of audio samples. The two primary attributes are :attr:`samples` (a ``float32`` array shaped ``(num_samples, num_channels)``) and :attr:`sample_rate` (Hz, or ``None`` for buffers without a defined rate). By convention, sample values in ``[-1.0, 1.0]`` correspond to digital full-scale amplitude; values outside this range are valid in memory but will clip when sent to playback or written to most file formats. Example: >>> import numpy as np >>> import pyquist as pq >>> sr = 44100 >>> t = np.arange(sr) / sr >>> audio = pq.Audio(np.sin(2 * np.pi * 440 * t), sample_rate=sr) >>> pq.play(audio) """ def __init__( self, samples: np.ndarray, sample_rate: Optional[int] = None, ): """Wraps an existing numpy array as ``Audio``. Args: samples: A numpy array of samples. Accepted as 0-D, 1-D, or 2-D (see the ``samples`` setter for shape normalization). Must be ``float32`` or ``float64`` (the latter is auto-converted). sample_rate: Optional sample rate in Hz; ``None`` for unspecified (e.g. when used as a real-time block buffer). """ self.samples = samples self.sample_rate = sample_rate
[docs] @classmethod def zeros( cls, num_samples: int, num_channels: int, sample_rate: Optional[int] = None, ) -> "Audio": """Creates a silent (zero-filled) ``Audio`` of the given shape. Useful as a destination buffer that you fill in via ``audio.samples`` or via in-place arithmetic. Args: num_samples: Number of samples per channel. Must be ``>= 0``. num_channels: Number of channels (1 for mono, 2 for stereo). Must be ``>= 0``. sample_rate: Optional sample rate in Hz. """ if num_samples < 0: raise ValueError("num_samples must be non-negative.") if num_channels < 0: raise ValueError("num_channels must be non-negative.") return cls( np.zeros((num_samples, num_channels), dtype=np.float32), sample_rate=sample_rate, )
[docs] @classmethod def from_file(cls, file: Union[str, pathlib.Path, IO]) -> "Audio": """Loads an ``Audio`` from a file on disk or a file-like object. Decoding is delegated to ``soundfile`` (libsndfile), which supports WAV, FLAC, OGG, MP3, and most common formats. The file's native sample rate is preserved; channels remain in their original order. Use :meth:`resample` to change the rate after loading. Raises :class:`FileNotFoundError` (with the offending path) when ``file`` is a path that doesn't exist — clearer than libsndfile's generic ``"System error"`` message. """ # Pre-check path-like inputs so a missing file produces a useful error # instead of LibsndfileError: "Error opening ...: System error.". if isinstance(file, (str, pathlib.Path)): path = pathlib.Path(file) if not path.exists(): raise FileNotFoundError(f"Audio file not found: {path}.") samples, sample_rate = sf.read(file) return cls(samples, sample_rate=sample_rate)
[docs] @classmethod def from_url(cls, url: str) -> "Audio": """Downloads an audio file from a URL and loads it as ``Audio``. The full response is buffered in memory before decoding. """ return cls.from_file(BytesIO(urlopen(url).read()))
[docs] @classmethod def concatenate(cls, audios: "list[Audio]") -> "Audio": """Joins a sequence of ``Audio`` end-to-end along the sample axis. All inputs must share a ``num_channels`` and a ``sample_rate``; otherwise ``ValueError`` is raised. The list must be non-empty. Args: audios: A non-empty list of ``Audio`` to join in order. """ audios = list(audios) if not audios: raise ValueError("concatenate requires at least one Audio.") sample_rates = {a.sample_rate for a in audios} if len(sample_rates) != 1: raise ValueError(f"Inconsistent sample rates: {sample_rates}.") channel_counts = {a.num_channels for a in audios} if len(channel_counts) != 1: raise ValueError(f"Inconsistent channel counts: {channel_counts}.") samples = np.concatenate([a.samples for a in audios], axis=0) return cls(samples, sample_rate=sample_rates.pop())
# --- Core attributes ---------------------------------------------------- @property def samples(self) -> np.ndarray: """The underlying ``(num_samples, num_channels)`` ``float32`` array. Returned by reference: in-place mutations (``audio.samples[0] = 0``, ``audio.samples *= 0.5``) modify the audio directly. Reassigning the attribute (``audio.samples = new_array``) re-runs validation. """ return self._samples @samples.setter def samples(self, value: np.ndarray) -> None: """Validates and stores ``value`` as the underlying sample array. Three conveniences are applied before validation: * a 0-D array becomes shape ``(1, 1)``; * a 1-D array of length ``n`` becomes shape ``(n, 1)`` (mono); * a ``float64`` array is cast to ``float32``. Anything else with the wrong dtype raises ``TypeError``; arrays with more than 2 dimensions raise ``ValueError``. When ``value`` is already a 2-D ``float32`` array, it is stored by reference (no copy) — this is what allows ``Audio`` to act as a thin view over an externally owned buffer (e.g. the ``outdata`` array in a real-time callback). """ if not isinstance(value, np.ndarray): raise TypeError( f"samples must be a numpy.ndarray, got {type(value).__name__}." ) if value.ndim == 0: value = value[np.newaxis, np.newaxis] elif value.ndim == 1: value = value[:, np.newaxis] elif value.ndim > 2: raise ValueError( f"samples must have shape (num_samples, num_channels); " f"got array with {value.ndim} dimensions." ) if value.dtype == np.float64: value = value.astype(np.float32) if value.dtype != np.float32: raise TypeError(f"samples must have dtype np.float32, got {value.dtype}.") self._samples = value @property def sample_rate(self) -> Optional[int]: """The sample rate in Hz, or ``None`` if unspecified.""" return self._sample_rate @sample_rate.setter def sample_rate(self, value: Optional[int]) -> None: """Sets the sample rate. Accepts a positive ``int`` or ``None``. Non-int values raise ``TypeError``; zero or negative values raise ``ValueError``. """ if value is None: self._sample_rate = None return if not isinstance(value, (int, np.integer)): raise TypeError( f"sample_rate must be int or None, got {type(value).__name__}." ) if value <= 0: raise ValueError(f"sample_rate must be positive, got {value}.") self._sample_rate = int(value) # --- Shape-derived properties ------------------------------------------ @property def num_samples(self) -> int: """Number of samples per channel (``samples.shape[0]``).""" return self._samples.shape[0] @property def num_channels(self) -> int: """Number of channels (``samples.shape[1]``); 1 for mono, 2 for stereo.""" return self._samples.shape[1] @property def shape(self) -> tuple: """Shape of the underlying array: ``(num_samples, num_channels)``.""" return self._samples.shape @property def duration(self) -> float: """Duration of the audio in seconds. Requires sample_rate to be set.""" if self._sample_rate is None: raise ValueError("Cannot compute duration without a sample_rate.") return self.num_samples / self._sample_rate @property def peak_amplitude(self) -> float: """Peak absolute sample value across all samples and channels. This is a linear amplitude (not decibels): ``1.0`` corresponds to digital full scale. Empty audio returns ``0.0``. Use :func:`pyquist.helper.amplitude_to_db` to convert to dBFS. """ if self._samples.size == 0: return 0.0 return float(np.abs(self._samples).max()) # --- Mutation methods ---------------------------------------------------
[docs] def clear(self) -> None: """Fills the audio with silence (zeros) in place. Shape, dtype, and ``sample_rate`` are unchanged. """ self._samples.fill(0.0)
[docs] def segment( self, *, offset: Optional[float] = None, duration: Optional[float] = None, ) -> "Audio": """Returns a new ``Audio`` containing a time-slice of this one. Both ``offset`` and ``duration`` are in seconds and require ``sample_rate`` to be set. Out-of-range values are clamped: a negative ``offset`` is treated as zero, and a ``duration`` that runs past the end is truncated. With both arguments ``None`` this is a no-op that returns ``self``. Args: offset: Start time in seconds. Defaults to the beginning. duration: Length in seconds. Defaults to the rest of the audio. Returns: A new ``Audio`` carrying the same ``sample_rate`` as ``self``. """ if offset is None and duration is None: return self if self._sample_rate is None: raise ValueError("segment() requires a sample_rate.") start = max(0, int((offset or 0.0) * self._sample_rate)) end = ( self.num_samples if duration is None else start + int(duration * self._sample_rate) ) start = min(start, self.num_samples) end = max(start, min(end, self.num_samples)) return Audio(self._samples[start:end, :], sample_rate=self._sample_rate)
[docs] def normalize(self, *, peak_dbfs: float = 0.0, in_place: bool = True) -> "Audio": """Scales the audio so its peak amplitude matches ``peak_dbfs``. ``peak_dbfs`` is measured in decibels relative to digital full scale (dBFS). ``0.0`` means full-scale (peak = 1.0); ``-6.0`` means roughly half full-scale (peak ≈ 0.501); positive values exceed full scale and will clip on playback. Silent audio (all zeros) is returned unchanged. Args: peak_dbfs: Target peak level in dBFS. Defaults to ``0.0``. in_place: If ``True`` (default), modifies and returns ``self``. If ``False``, returns a new ``Audio`` and leaves the original untouched. """ peak = self.peak_amplitude if peak == 0.0: gain = 1.0 else: gain = float(db_to_amplitude(peak_dbfs)) / peak if in_place: self._samples *= gain return self return Audio(self._samples * gain, sample_rate=self._sample_rate)
[docs] def clip(self, *, peak_amplitude: float = 1.0, in_place: bool = True) -> "Audio": """Symmetrically clamps every sample to ``[-peak_amplitude, +peak_amplitude]``. This is a hard clip — samples beyond the threshold are truncated, not scaled. To rescale instead, use :meth:`normalize`. Args: peak_amplitude: Symmetric clip threshold in linear amplitude. Defaults to ``1.0`` (digital full scale). in_place: If ``True`` (default), modifies and returns ``self``. If ``False``, returns a new ``Audio`` and leaves the original untouched. """ clipped = np.clip(self._samples, -peak_amplitude, peak_amplitude) if in_place: self._samples[:] = clipped return self return Audio(clipped, sample_rate=self._sample_rate)
[docs] def as_mono(self) -> "Audio": """Returns a mono (1-channel) version of the audio. Multi-channel audio is mixed down by averaging across channels (mean, not sum), which preserves perceived loudness without risking clipping. If the audio is already mono, returns ``self`` (no copy). """ if self.num_channels == 1: return self mono = self._samples.mean(axis=1, keepdims=True).astype(np.float32) return Audio(mono, sample_rate=self._sample_rate)
[docs] def as_stereo(self) -> "Audio": """Returns a stereo (2-channel) version of the audio. Mono audio is duplicated across both channels (the same signal in L and R). Stereo audio is returned as ``self`` (no copy). Audio with 3 or more channels raises ``ValueError`` — this method does not try to guess a downmix. """ if self.num_channels == 2: return self if self.num_channels == 1: stereo = np.repeat(self._samples, 2, axis=1) return Audio(stereo, sample_rate=self._sample_rate) raise ValueError( f"Cannot convert audio with {self.num_channels} channels to stereo." )
[docs] def resample(self, new_sample_rate: int, **kwargs) -> "Audio": """Returns a new ``Audio`` resampled to ``new_sample_rate``. Resampling is performed by ``soxr`` using a bandlimited sinc filter; extra keyword arguments (e.g. ``quality='VHQ'``) are forwarded to :func:`soxr.resample`. The number of channels is preserved; the number of samples scales by ``new_sample_rate / self.sample_rate``. Raises ``ValueError`` if ``self.sample_rate`` is ``None`` or ``new_sample_rate`` is non-positive. """ if self._sample_rate is None: raise ValueError("Cannot resample without a sample_rate.") if not isinstance(new_sample_rate, (int, np.integer)): raise TypeError("new_sample_rate must be an int.") if new_sample_rate <= 0: raise ValueError("new_sample_rate must be positive.") resampled = soxr.resample( self._samples, self._sample_rate, new_sample_rate, **kwargs ) return Audio(resampled, sample_rate=new_sample_rate)
[docs] def write(self, file: Union[str, IO], **kwargs) -> None: """Writes the audio to a file via ``soundfile``. The output format is inferred from the file extension (``.wav``, ``.flac``, ``.ogg``, ...). Extra keyword arguments are forwarded to :func:`soundfile.write` (e.g. ``subtype='PCM_24'``). Samples outside ``[-1.0, 1.0]`` will clip in fixed-point formats; consider calling :meth:`clip` or :meth:`normalize` first. Raises ``ValueError`` if ``self.sample_rate`` is ``None``. """ if self._sample_rate is None: raise ValueError("Cannot write audio without a sample_rate.") sf.write(file, self._samples, self._sample_rate, **kwargs)
# --- numpy interop ------------------------------------------------------ def __array__(self, dtype=None, copy=None) -> np.ndarray: """NumPy's "convert to ndarray" hook. Lets ``Audio`` be passed transparently to any function that calls ``np.asarray(...)`` internally (``np.mean``, ``np.fft.rfft``, ``matplotlib.plot``, ...). Returns the underlying samples by reference on the fast path; copies only when the caller requests a dtype change or ``copy=True``. """ if dtype is None and not copy: return self._samples return self._samples.astype(dtype if dtype is not None else self._samples.dtype) # --- Indexing / length -------------------------------------------------- def __len__(self) -> int: return self.num_samples def __getitem__(self, key) -> "Audio": """Returns a new ``Audio`` wrapping the indexed samples. Only patterns that preserve the ``(num_samples, num_channels)`` layout are accepted — i.e., the sample axis (axis 0) must be sliced, not collapsed to a single int. Examples (``audio`` has shape ``(10000, 2)``):: audio[1000:2000] # → Audio with shape (1000, 2) audio[:, 0] # → Audio with shape (10000, 1) audio[1000:2000, 1:3] # → Audio with shape (1000, 2) Indexing axis 0 with a single ``int`` (``audio[1000]``, ``audio[0, 0]``) is rejected with ``TypeError`` — it's ambiguous as Audio, and almost always either a scalar read (use ``audio.samples[i, j]``) or a length-1 slice (use ``audio[i:i+1]``). The returned ``Audio`` is a view of the underlying samples when the key supports it, and carries the same ``sample_rate``. """ first = key[0] if isinstance(key, tuple) and key else key if isinstance(first, (int, np.integer)): raise TypeError( f"Audio[...] does not support indexing the sample axis with " f"a single int (got {key!r}). Use audio.samples[...] for raw " f"numpy access, or audio[i:i+1] for a length-1 Audio." ) return Audio(self._samples[key], sample_rate=self._sample_rate) def __setitem__(self, key, value) -> None: self._samples[key] = value # --- Arithmetic --------------------------------------------------------- def _check_compatible(self, other: "Audio") -> Optional[int]: """Validates that ``other`` can be combined with ``self`` and returns the sample rate the result should carry.""" if self.shape != other.shape: raise ValueError(f"Shape mismatch: {self.shape} vs {other.shape}.") if ( self._sample_rate is not None and other._sample_rate is not None and self._sample_rate != other._sample_rate ): raise ValueError( f"Sample rate mismatch: {self._sample_rate} != {other._sample_rate}." ) return ( self._sample_rate if self._sample_rate is not None else other._sample_rate ) def _binary_op(self, other, op) -> "Audio": if isinstance(other, Audio): sr = self._check_compatible(other) return Audio(op(self._samples, other._samples), sample_rate=sr) return Audio(op(self._samples, other), sample_rate=self._sample_rate) def _ibinary_op(self, other, op) -> "Audio": if isinstance(other, Audio): self._check_compatible(other) op(self._samples, other._samples) else: op(self._samples, other) return self def __add__(self, other) -> "Audio": return self._binary_op(other, lambda a, b: a + b) def __radd__(self, other) -> "Audio": return Audio(other + self._samples, sample_rate=self._sample_rate) def __iadd__(self, other) -> "Audio": def _iadd(a, b): a += b return self._ibinary_op(other, _iadd) def __sub__(self, other) -> "Audio": return self._binary_op(other, lambda a, b: a - b) def __rsub__(self, other) -> "Audio": return Audio(other - self._samples, sample_rate=self._sample_rate) def __isub__(self, other) -> "Audio": def _isub(a, b): a -= b return self._ibinary_op(other, _isub) def __mul__(self, other) -> "Audio": return self._binary_op(other, lambda a, b: a * b) def __rmul__(self, other) -> "Audio": return Audio(other * self._samples, sample_rate=self._sample_rate) def __imul__(self, other) -> "Audio": def _imul(a, b): a *= b return self._ibinary_op(other, _imul) def __truediv__(self, other) -> "Audio": return self._binary_op(other, lambda a, b: a / b) def __itruediv__(self, other) -> "Audio": def _idiv(a, b): a /= b return self._ibinary_op(other, _idiv) def __neg__(self) -> "Audio": return Audio(-self._samples, sample_rate=self._sample_rate) def __repr__(self) -> str: return ( f"Audio(num_samples={self.num_samples}, " f"num_channels={self.num_channels}, " f"sample_rate={self._sample_rate})" )