228 lines
6.9 KiB
Python
228 lines
6.9 KiB
Python
# Python-SoXR
|
|
# https://github.com/dofuuz/python-soxr
|
|
|
|
# SPDX-FileCopyrightText: (c) 2021 Myungchul Keum
|
|
# SPDX-License-Identifier: LGPL-2.1-or-later
|
|
|
|
# High quality, one-dimensional sample-rate conversion library for Python.
|
|
# Python-SoXR is a Python wrapper of libsoxr.
|
|
|
|
import numpy as np
|
|
from numpy.typing import ArrayLike
|
|
|
|
from . import soxr_ext
|
|
from .soxr_ext import QQ, LQ, MQ, HQ, VHQ
|
|
from ._version import version as __version__
|
|
|
|
|
|
__libsoxr_version__ = soxr_ext.libsoxr_version()
|
|
|
|
# libsoxr locates memory per each channel.
|
|
# Too much channels will cause memory error.
|
|
_CH_LIMIT = 65536
|
|
|
|
_DTYPE_UNMATCH_ERR_STR = 'Input should be a `np.ndarray` with matching dtype for ResampleStream({}).'
|
|
_CH_EXEED_ERR_STR = 'Channel num({}) out of limit. Should be in [1, %d]' % _CH_LIMIT
|
|
_DTYPE_ERR_STR = 'Data type must be one of [float32, float64, int16, int32], not {}'
|
|
_QUALITY_ERR_STR = "Quality must be one of [QQ, LQ, MQ, HQ, VHQ]"
|
|
|
|
_QUALITY_ENUM_DICT = {
|
|
VHQ: VHQ, 'vhq': VHQ, 'soxr_vhq': VHQ,
|
|
HQ: HQ, 'hq': HQ, 'soxr_hq': HQ,
|
|
MQ: MQ, 'mq': MQ, 'soxr_mq': MQ,
|
|
LQ: LQ, 'lq': LQ, 'soxr_lq': LQ,
|
|
QQ: QQ, 'qq': QQ, 'soxr_qq': QQ,
|
|
}
|
|
|
|
|
|
def _quality_to_enum(q):
|
|
if isinstance(q, str):
|
|
q = q.lower()
|
|
|
|
try:
|
|
return _QUALITY_ENUM_DICT[q]
|
|
except (KeyError, TypeError):
|
|
raise ValueError(_QUALITY_ERR_STR)
|
|
|
|
|
|
def _to_soxr_datatype(ntype):
|
|
if ntype == np.float32:
|
|
return soxr_ext.SOXR_FLOAT32_I
|
|
elif ntype == np.float64:
|
|
return soxr_ext.SOXR_FLOAT64_I
|
|
elif ntype == np.int32:
|
|
return soxr_ext.SOXR_INT32_I
|
|
elif ntype == np.int16:
|
|
return soxr_ext.SOXR_INT16_I
|
|
else:
|
|
raise TypeError(_DTYPE_ERR_STR.format(ntype))
|
|
|
|
|
|
class ResampleStream:
|
|
""" Streaming resampler
|
|
|
|
Use `ResampleStream` for real-time processing or very long signal.
|
|
|
|
Parameters
|
|
----------
|
|
in_rate : float
|
|
Input sample-rate.
|
|
out_rate : float
|
|
Output sample-rate.
|
|
num_channels : int
|
|
Number of channels.
|
|
dtype : type or str, optional
|
|
Internal data type processed with.
|
|
Should be one of float32, float64, int16, int32.
|
|
quality : int or str, optional
|
|
Quality setting.
|
|
One of `QQ`, `LQ`, `MQ`, `HQ`, `VHQ`.
|
|
"""
|
|
|
|
def __init__(self,
|
|
in_rate: float, out_rate: float, num_channels: int,
|
|
dtype='float32', quality='HQ'):
|
|
if in_rate <= 0 or out_rate <= 0:
|
|
raise ValueError('Sample rate should be over 0')
|
|
|
|
if num_channels < 1 or _CH_LIMIT < num_channels:
|
|
raise ValueError(_CH_EXEED_ERR_STR.format(num_channels))
|
|
|
|
self._type = np.dtype(dtype)
|
|
stype = _to_soxr_datatype(self._type)
|
|
|
|
q = _quality_to_enum(quality)
|
|
|
|
self._csoxr = soxr_ext.CSoxr(in_rate, out_rate, num_channels, stype, q)
|
|
self._process = getattr(self._csoxr, f'process_{self._type}')
|
|
|
|
def resample_chunk(self, x: np.ndarray, last=False) -> np.ndarray:
|
|
""" Resample chunk with streaming resampler
|
|
|
|
Parameters
|
|
----------
|
|
x : np.ndarray
|
|
Input array. Input can be mono(1D) or multi-channel(2D of [frame, channel]).
|
|
dtype should match with constructor.
|
|
|
|
last : bool, optional
|
|
Set True at final chunk to flush last outputs.
|
|
It should be `True` only once at the end of a continuous sequence.
|
|
|
|
Returns
|
|
-------
|
|
np.ndarray
|
|
Resampled data.
|
|
Output is np.ndarray with same ndim with input.
|
|
|
|
"""
|
|
if type(x) != np.ndarray or x.dtype != self._type:
|
|
raise TypeError(_DTYPE_UNMATCH_ERR_STR.format(self._type))
|
|
|
|
if x.ndim == 1:
|
|
y = self._process(x[:, np.newaxis], last)
|
|
return np.squeeze(y, axis=1)
|
|
elif x.ndim == 2:
|
|
return self._process(x, last)
|
|
else:
|
|
raise ValueError('Input must be 1-D or 2-D array')
|
|
|
|
def num_clips(self) -> int:
|
|
""" Clip counter. (for int I/O)
|
|
|
|
Returns
|
|
-------
|
|
int
|
|
Count of clipped samples.
|
|
"""
|
|
return self._csoxr.num_clips()
|
|
|
|
def delay(self) -> float:
|
|
""" Get current delay.
|
|
|
|
SoXR output has an algorithmic delay. This function returns the length of current pending output.
|
|
|
|
Returns
|
|
-------
|
|
float
|
|
Current delay in output samples.
|
|
"""
|
|
return self._csoxr.delay()
|
|
|
|
def clear(self) -> None:
|
|
""" Reset resampler. Ready for fresh signal, same config.
|
|
|
|
This can be used to save initialization time.
|
|
"""
|
|
self._csoxr.clear()
|
|
|
|
|
|
def resample(x: ArrayLike, in_rate: float, out_rate: float, quality='HQ') -> np.ndarray:
|
|
""" Resample signal
|
|
|
|
Parameters
|
|
----------
|
|
x : array_like
|
|
Input array. Input can be mono(1D) or multi-channel(2D of [frame, channel]).
|
|
If input is not `np.ndarray`, it will be converted to `np.ndarray(dtype='float32')`.
|
|
Its dtype should be one of float32, float64, int16, int32.
|
|
in_rate : float
|
|
Input sample-rate.
|
|
out_rate : float
|
|
Output sample-rate.
|
|
quality : int or str, optional
|
|
Quality setting.
|
|
One of `QQ`, `LQ`, `MQ`, `HQ`, `VHQ`.
|
|
|
|
Returns
|
|
-------
|
|
np.ndarray
|
|
Resampled data.
|
|
Output is `np.ndarray` with same ndim and dtype with input.
|
|
"""
|
|
if in_rate <= 0 or out_rate <= 0:
|
|
raise ValueError('Sample rate should be over 0')
|
|
|
|
if type(x) != np.ndarray:
|
|
x = np.asarray(x, dtype=np.float32)
|
|
|
|
try:
|
|
if x.strides[0] == x.itemsize: # split channel memory layout
|
|
divide_proc = getattr(soxr_ext, f'csoxr_split_ch_{x.dtype}')
|
|
else:
|
|
divide_proc = getattr(soxr_ext, f'csoxr_divide_proc_{x.dtype}')
|
|
except AttributeError:
|
|
raise TypeError(_DTYPE_ERR_STR.format(x.dtype))
|
|
|
|
q = _quality_to_enum(quality)
|
|
|
|
if x.ndim == 1:
|
|
y = divide_proc(in_rate, out_rate, x[:, np.newaxis], q)
|
|
return np.squeeze(y, axis=1)
|
|
elif x.ndim == 2:
|
|
num_channels = x.shape[1]
|
|
if num_channels < 1 or _CH_LIMIT < num_channels:
|
|
raise ValueError(_CH_EXEED_ERR_STR.format(num_channels))
|
|
|
|
return divide_proc(in_rate, out_rate, x, q)
|
|
else:
|
|
raise ValueError('Input must be 1-D or 2-D array')
|
|
|
|
|
|
def _resample_oneshot(x: np.ndarray, in_rate: float, out_rate: float, quality='HQ') -> np.ndarray:
|
|
"""
|
|
Resample using libsoxr's `soxr_oneshot()`. Use `resample()` for general use.
|
|
`soxr_oneshot()` becomes slow with long input.
|
|
This function exists for test purpose.
|
|
"""
|
|
try:
|
|
oneshot = getattr(soxr_ext, f'csoxr_oneshot_{x.dtype}')
|
|
except AttributeError:
|
|
raise TypeError(_DTYPE_ERR_STR.format(x.dtype))
|
|
|
|
if x.ndim == 1:
|
|
y = oneshot(in_rate, out_rate, x[:, np.newaxis], _quality_to_enum(quality))
|
|
return np.squeeze(y, axis=1)
|
|
|
|
return oneshot(in_rate, out_rate, x, _quality_to_enum(quality))
|