Source code for streaming._iterator

"""
Iterator
========

The :mod:`streaming.iterator` module is a private module. Most algorithms for streams are implemented here using basic iterators.
"""
import cytoolz
import itertools
import operator
import numpy as np
import operator
import collections
from ._cython import _interpolate_linear as interpolate_linear
from ._cython import _filter_ba, diff

try:
    from scipy.signal import fftconvolve as _convolve
except ImportError:
    _convolve = np.convolve

[docs]def blocks(iterable, nblock, noverlap=0): """Partition iterable into blocks. :param iterable: Iterable. :param nblock: Samples per block. :param noverlap: Amount of samples to overlap :returns: Blocks. """ # We use a different function for performance reasons if noverlap==0: return _blocks(iterable, nblock) else: return _overlapping_blocks(iterable, nblock, noverlap)
def _blocks(iterable, nblock): """Partition iterable into blocks. :param iterable: Iterable. :param nblock: Samples per block. :returns: Blocks. """ iterator = iter(iterable) partitions = cytoolz.partition(nblock, iterator) yield from partitions def _overlapping_blocks(iterable, nblock, noverlap): """Partition iterable into overlapping blocks of size `nblock`. :param iterable: Iterable. :param nblock: Samples per block. :param noverlap: Amount of samples to overlap. :returns: Blocks. """ iterator = iter(iterable) nadvance = nblock - noverlap if nadvance < 1: raise ValueError("`noverlap` has to be smaller than `nblock-1`.") # First `noverlap` samples previous = list(cytoolz.take(noverlap, iterator)) advances = map(list, cytoolz.partition(nadvance, iterator)) for advance in advances: block = previous + advance # Concat lists yield block previous = block[-noverlap:] def change_blocks(iterator, nblock, noverlap, nblock_new, noverlap_new): """Change blocksize and/or overlap of iterator. :param iterator: Iterator. :param nblock: Current blocksize. :param noverlap: Current overlap. :param nblock_new: New blocksize. :param noverlap_new: New overlap. :returns: Iterator with new blocksize and/or overlap. """ # Same block size, same overlap if nblock_new==nblock and noverlap_new==noverlap: return iterator # New block size is multiple of old block size, same overlap elif not nblock_new % nblock and noverlap_new==noverlap: # factor is multiple of current blocksize factor = nblock_new // nblock # therefore we concat `factor` blocks into a new block partitioned = map(np.concatenate, cytoolz.partition(factor, iterator)) return partitioned # Old block size is multiple of new block size, sample overlap elif not nblock % nblock_new and noverlap_new==noverlap: # Partition each block in blocks with size nblock_new partition = lambda x: cytoolz.partition(nblock_new, x) # And chain the iterables partitioned = itertools.chain.from_iterable(map(partition, iterator)) return partitioned # Convert to samples and create blocks else: return blocks(samples(iterator, nblock, noverlap), nblock_new, noverlap_new) def samples(iterator, nblock, noverlap=0): """Convert iterator with (overlapped) blocks to iterator with individual samples. :param iterator: Iterator. :param nblock: Samples per block :param noverlap: Amount of samples to overlap """ if noverlap!=0: nadvance = nblock - noverlap iterator = map(lambda x: x[0:nadvance], iterator) yield from itertools.chain.from_iterable(iterator) # Some convenience functions def sliding_mean(iterable, nwindow, noverlap=0): """Sliding mean. :param iterable: Iterable. :param nwindow: Window size in samples. :param noverlap: Amount of samples to overlap. :returns: Iterable of means. """ yield from map(np.mean, blocks(iterable, nwindow, noverlap)) def sliding_std(iterable, nwindow, noverlap=0): """Sliding standard deviation. :param iterable: Iterable. :param nwindow: Window size in samples. :param noverlap: Amount of samples to overlap. :returns: Iterable of standard deviations. """ yield from map(np.std, blocks(iterable, nwindow, noverlap)) def sliding_var(iterable, nwindow, noverlap=0): """Sliding variance. :param iterable: Iterable. :param nwindow: Window size in samples. :param noverlap: Amount of samples to overlap. :returns: Iterable of standard deviations. """ yield from map(np.var, blocks(iterable, nwindow, noverlap)) # Convolution
[docs]def convolve(signal, impulse_responses, nblock, ntaps=None, initial_values=None): """Convolve signal with impulse response. :param signal: Signal, not in blocks. :param impulse_responses: Impulse responses of length `ntaps`. :param nblock: Blocksize to use for the convolution. :param ntaps: Length of impulse responses. :param initial_values. Value to use before convolution kicks in. .. note:: This function takes samples and yields samples. It wraps :func:`convolve_overlap_add` and therefore requires a blocksize for computing the convolution. """ signal = blocks(signal, nblock) convolved = convolve_overlap_add(signal, impulse_responses, nblock=nblock, ntaps=ntaps, initial_values=initial_values) yield from itertools.chain.from_iterable(convolved)
def _convolve_crossfade(block, ir1, ir2, fading1): """Convolve block with two impulse responses and crossfade the result. :param block: Block. :param ir1: Impulse response 1. :param ir2: Impulse response 2. :param fading1: Fading. :returns: Crossfaded convolutions of block and impulse responses. We switch from `ir1` to `ir2`. A more efficient method is presented in *Efficient time-varying FIR filtering using crossfading implemented in the DFT domain* by Frank Wefers. """ # Convolve segment with both impulse responses. convolved1 = _convolve(block, ir1, mode='full') convolved2 = _convolve(block, ir2, mode='full') # Fading windows fading2 = 1. - fading1 # Crossfaded convolutions convolved = convolved1 * fading1 + convolved2 * fading2 return convolved def convolve_overlap_add_spectra(signal, spectra, nblock, nbins, initial_values=None): """Convolve iterable `signal` with impulse responses of `spectra. This function directly uses the spectra to compute the acyclic convolution. """ return NotImplemented
[docs]def convolve_overlap_add(signal, impulse_responses, nhop, ntaps, initial_values=None): """Convolve iterable `signal` with time-variant `impulse_responses`. :param signal: Signal in blocks of size `nblock`. :param impulse_responses: Impulse responses of length `ntaps`. :param nhop: Impulse responses is updated every `nhop` samples. This should correspond to the blocksize of `signal`. :param ntaps: Length of impulse responses. :param initial_values. Value to use before convolution kicks in. :returns: Convolution. This function implements the overlap-add method. Time-variant `impulse_responses is supported. Each item (`block`) in `signal` corresponds to an impulse response (`ir`) in `impulse_responses`. This implementation of overlap-add buffers only one segment. Therefore, only `nblock>=ntaps` is supported. .. warning:: This function cannot be used when `ntaps > nblock`. .. seealso:: :func:`convolve_overlap_save` """ nblock = nhop if not nblock >= ntaps: raise ValueError("Amount of samples in block should be the same or more than the amount of filter taps.") if initial_values is None: tail_previous_block = np.zeros(ntaps-1) else: tail_previous_block = initial_values for block, ir in zip(signal, impulse_responses): # The result of the convolution consists of a head, a body and a tail # - the head and tail have length `taps-1` # - the body has length `signal-taps+1`?? try: convolved = _convolve(block, ir, mode='full') except ValueError: raise GeneratorExit # The final block consists of # - the head and body of the current convolution # - and the tail of the previous convolution. resulting_block = convolved[:-ntaps+1] #print(resulting_block) #resulting_block[:ntaps-1] += tail_previous_block resulting_block[:ntaps-1] = resulting_block[:ntaps-1] + tail_previous_block # Because of possibly different dtypes # We store the tail for the next cycle tail_previous_block = convolved[-ntaps+1:] #print(tail_previous_block) # Yield the result of this step yield resulting_block
[docs]def convolve_overlap_discard(signal, impulse_response, nblock_in=None, nblock_out=None): """Convolve signal with linear time-invariant `impulse_response` using overlap-discard method. :param signal: Signal. Can either consists of blocks or samples. `nblock_in` should be set to the block size of the signal. :param impulse_response: Linear time-invariant impulse response of filter. :param nblock_in: Actual input blocksize of signal. Should be set to `None` is `signal` is sample-based. :param nblock_out: Desired output blocksize. :returns: Convolution of `signal` with `impulse_response`. :rtype: Generator consisting of arrays. Setting the input blocksize can be useful because this gives control over the delay of the process. Setting the output blocksize is convenient because you know on beforehand the output blocksize. Setting neither will result in blocksize of one, or individual samples. This will be slow. Setting both is not possible. .. note:: The *overlap-discard* method is more commonly known as *overlap-save*. """ # Amount of filter taps ntaps = len(impulse_response) # Amount of overlap that is needed noverlap = ntaps -1 # In the following block we create overlapping windows. # Both are set. if nblock_in is not None and nblock_out is not None: raise ValueError("Set block size of either input or output.") # Only output blocksize is explicitly mentioned elif nblock_out is not None: nblock_in = nblock_out + ntaps -1 # Only input blocksize is explicitly mentioned elif nblock_in is not None: if not nblock_in >= ntaps: raise ValueError("Amount of samples in block should be the same or more than the amount of filter taps.") nblock_out = nblock_in - ntaps + 1 else: nblock_in = ntaps nblock_out = nblock_in - ntaps + 1 windows = blocks(signal, nblock_in, noverlap) ## We have sample-based signal and we want blocks with specified size out. #if nblock_in is None and nblock_out is not None: #nblock_in = nblock_out + ntaps -1 #windows = blocks(signal, nblock_in, noverlap) ## We have sample-based signal and we want samples out (actually blocks of size 1). #elif nblock_in is None and nblock_out is None: #nblock_in = ntaps #windows = blocks(signal, nblock_in, noverlap) ## We have block-based signal and we don't mind output block size #elif nblock_in is not None and nblock_out is None: #if not nblock_in >= ntaps: #raise ValueError("Amount of samples in block should be the same or more than the amount of filter taps.") #nblock_out = nblock_in - ntaps + 1 #windows = change_blocks(signal, nblock_in, 0, nblock_in, noverlap) ## We have block-based signal and we have specified an output block. We need to change the block size. #elif nblock_in is not None and nblock_out is not None: #if not nblock_in >= ntaps: #raise ValueError("Amount of samples in block should be the same or more than the amount of filter taps.") #nblock_in_new = nblock_out + ntaps -1 #windows = change_blocks(signal, nblock_in, 0, nblock_in_new, noverlap, ) #nblock_in = nblock_in_new # Convolve function to use _convolve_func = lambda x: _convolve(x, impulse_response, mode='valid') # Convolved blocks convolved = map(_convolve_func, windows ) return convolved, nblock_out
def convolve_overlap_save(signal, impulse_responses, nhop, ntaps): """Convolve signal with linear time-invariant `impulse_response` using overlap-discard method. :param signal: Signal. Consists of samples. :param impulse_responses: Linear time-variant impulses response of filter. :param nhop: Impulse response is renewed every `nhop` samples. :returns: Convolution of `signal` with `impulse_responses`. :rtype: Generator consisting of arrays. .. note:: The *overlap-discard* method is more commonly known as *overlap-save*. """ nwindow = nhop + ntaps - 1 noverlap = ntaps - 1 windows = blocks(signal, nwindow, noverlap) # Convolve function to use _convolve_func = lambda x, y: _convolve(x, y, mode='valid') # Convolved blocks convolved = map(_convolve_func, windows, impulse_responses ) yield from convolved def cumsum(iterator): """Cumulative sum. .. seealso:: :func:`itertools.accumulate` and :func:`np.cumsum` """ yield from itertools.accumulate(iterator, operator.add) def cummul(iterator): """Cumulative p. .. seealso:: :func:`itertools.accumulate` and :func:`np.cumsum` """ yield from itertools.accumulate(iterator, operator.mul) #def diff(iterator, initial_value=0.0): #"""Differentiate `iterator`. #""" #current = next(iterator) #while True: #old = current #current = next(iterator) #yield current-old #def integrate(iterator, initial_value=0.0): #total = 0.0 #while True: #total += next(iterator) #yield total
[docs]def vdl(signal, times, delay, initial_value=0.0): """Variable delay line which delays `signal` at 'times' with 'delay'. :param signal: Signal to be delayed. :type signal: Iterator :param delay: Delay. :type delay: Iterator :param initial_value: Sample to yield before first actual sample is yielded due to initial delay. .. note:: Times and delay should have the same unit, e.g. both in samples or both in seconds. """ dt0, delay = cytoolz.peek(delay) times, _times = itertools.tee(times) # Yield initial value before interpolation kicks in # Note that this method, using tee, buffers all samples that will be discarded. # Therefore, room for optimization! n = 0 if initial_value is not None: while next(_times) < dt0: n += 1 yield initial_value times1, times2 = itertools.tee(times) interpolated = interpolate_linear(map(operator.add, times2, delay), signal, times1) yield from cytoolz.drop(n, interpolated) # FIXME: move drop before interpolation, saves memory
# Reference implementation
[docs]def filter_ba_reference(x, b, a): """Filter signal `x` with linear time-invariant IIR filter that has numerator coefficients `b` and denominator coefficients `a`. :param b: Numerator coefficients. :param a: Denominator coefficients. The first value is always a zero. :param x: Signal. :returns: Filtered signal. This function applies a linear time-invariant IIR filter using the difference equation .. math:: y[n] = -\sum_k=1^M a_k y[n-k] + \sum_k=0^(N-1) b_k x[n-k] """ b = np.array(b) a = np.array(a[1:]) na = len(a) nb = len(b) # Buffers xd = collections.deque([0]*nb, nb) yd = collections.deque([0]*na, na) # Invert filter coefficients order b = b[::-1] a = a[::-1] while True: # Update inputs buffer with new signal value xd.append(next(x)) # Calculate output from difference equation result = -sum(a*yd) + sum(b*xd) # Update outputs buffer with new output value yd.append(result) # Yield current output value yield result
[docs]def filter_ba(x, b, a): """Apply IIR filter to `x`. :param x: Signal. :param b: Numerator coefficients. :param a: Denominator coefficients. :returns: Filtered signal. .. seealso:: :func:`filter_sos` and :func:`scipy.signal.lfilter` """ a = a[1:] # Drop the first value that is a one. See difference equation. # Drop trailing zeros. They're not contributing and introduce a bug as well (leading zero in result). while a[-1] == 0.0: a = a[:-1] while b[-1] == 0.0: b = b[:-1] a = np.array(a) b = np.array(b) na = len(a) nb = len(b) # Buffers xd = np.zeros(nb) yd = np.zeros(na) yield from _filter_ba(x, b, a, xd, yd, nb, na)
[docs]def filter_sos(x, sos): """Apply IIR filter to `x`. :param x: Signal. :param sos: Second-order sections. :returns: Filtered signal. .. seealso:: :func:`filter_ba` and :func:`scipy.signal.sosfilt` """ sos = np.atleast_2d(sos) if sos.ndim != 2: raise ValueError('sos array must be 2D') n_sections, m = sos.shape if m != 6: raise ValueError('sos array must be shape (n_sections, 6)') for section in sos: x = filter_ba(x, section[:3], section[3:]) yield from x
__all__ = ['blocks', 'convolve_overlap_add', 'convolve', 'convolve_overlap_discard', 'diff', 'interpolate_linear', 'filter_ba', 'filter_ba_reference', 'filter_sos', 'vdl']