"""
Stream
======
The Stream module contains the classes :class:`Stream` and :class:`BlockStream`.
"""
import collections
import itertools
import numpy as np
import operator
import cytoolz
from multipledispatch.conflict import AmbiguityWarning
import warnings
warnings.filterwarnings("ignore", category=AmbiguityWarning)
#import streaming
from streaming.operators import *
from streaming.abstractstream import *
from streaming.itertools import *
[docs]class Stream(AbstractStream):
"""Stream of samples.
"""
def _construct(self, iterator):
return Stream(iterator)
@property
def nblock(self):
raise AttributeError("Stream does not have a blocksize")
def blocks(self, nblock, noverlap=0):
blocks = map(np.array, streaming._iterator.blocks(self._iterator, nblock, noverlap))
return BlockStream(blocks, nblock, noverlap)
[docs] def drop(self, nsamples):
"""Drop the first `n` samples."""
return super().drop(nsamples)
def map(self, func):
return self._construct(map(func, self._iterator))
[docs] def take(self, nsamples):
"""Take the first `nsamples` samples."""
return super().take(nsamples)
def samples(self):
return self
[docs]class BlockStream(AbstractStream):
"""Stream of blocks of samples.
"""
def __init__(self, iterator, nblock, noverlap=0):
# We either peek, and we know the type and blocksize assuming a homogeneous stream
# or we don't and we need nblock as argument and hope the blocks are of the right type.
#try:
#first, iterator = peek(iterator)
#except StopIteration:
#raise ValueError("Empty iterator.")
## The elements in the iterator have to be iterators.
## Because the idea of the blockstream is to use vectorized operations
## we require a stricter class, np.ndarray
#if not isinstance(first, np.ndarray):
#raise ValueError("Items in iterator have to be of class `np.ndarray`.")
# With the above disabled we get failing tests for eq and ne operations
# See https://github.com/numpy/numpy/issues/5016
super().__init__(iterator)
if noverlap > nblock-1:
raise ValueError("`noverlap` should be smaller than `nblock-1`")
self._nblock = nblock
self._noverlap = noverlap
def _construct(self, iterator):
return BlockStream(iterator, self.nblock, self.noverlap)
@property
def nblock(self):
return self._nblock
@property
def noverlap(self):
return self._noverlap
def blocks(self, nblock, noverlap=0):
blocks = streaming._iterator.change_blocks(self._iterator, self.nblock, self.noverlap, nblock, noverlap)
return BlockStream(map(np.array, blocks), nblock, noverlap)
[docs] def drop(self, n):
"""Drop the first `n` blocks.
.. note:: If you want to drop `n` samples, use `s.samples().drop(n)`.
"""
return self._construct(cytoolz.drop(n, self))
[docs] def map(self, func):
"""Map `func` to each block in :class:`BlockStream`.
"""
return self._construct(map(func, self))
[docs] def mean(self):
"""Mean value calculated over `nblock` samples.
This function returns a :class:`Stream`.
"""
return Stream(self.map(np.mean))
[docs] def nblocks(self):
"""Amount of blocks in stream.
.. warning:: This consumes the stream.
"""
return count(self)
[docs] def std(self):
"""Standard deviation calculated over `nblock` samples.
This function returns a :class:`Stream`.
"""
return Stream(self.map(np.std))
[docs] def sum(self):
"""Sum calculated over `nblock` samples.
This function returns a :class:`Stream`.
"""
return Stream(self.map(np.sum))
[docs] def take(self, nblocks):
"""Take `nblocks` from stream.
"""
return self._construct(cytoolz.take(nblocks, self._iterator))
[docs] def samples(self):
"""Iterate over samples.
:returns: Stream of samples insteads of blocks. Possible overlap is taking into account.
"""
return Stream(streaming._iterator.samples(self._iterator, self.nblock, self.noverlap))
[docs] def var(self):
"""Variance calculated over `nblock` samples.
This function returns a :class:`Stream`.
"""
return Stream(self.map(np.var))
# Binary operators (AbstractStream, Stream)
def _binary_op_abstractstream_stream(op, a, b):
return _binary_op_abstractstream_abstractstream(op, a, b)
# Binary operators (AbstractStream, BlockStream)
def _binary_op_abstractstream_blockstream(op, a, b):
return _binary_op_abstractstream_stream(op, a, b.samples())
# Binary operators (Stream, Stream)
def _binary_op_stream_stream(op, a, b):
return Stream(map(op, a._iterator, b._iterator))
# Binary operators (BlockStream, BlockStream)
def _binary_op_blockstream_blockstream(op, a, b):
if a.nblock == b.nblock:
return BlockStream(map(op, a, b), nblock=a.nblock)
#elif a.nblock % b.nblock == 0:
## A is multiple of B
#pass
#elif b.nblock % a.nblock == 0:
## B is multiple of A
#pass
else:
return op(a.samples(), b.samples())
# Binary operators (Stream, object)
def _binary_op_stream_object(op, a, b):
return Stream(op(i, b) for i in a._iterator)
# Binary operators (object, Stream)
def _binary_op_object_stream(op, a, b):
return Stream(op(a, i) for i in b._iterator)
# Binary operators (BlockStream, object)
def _binary_op_blockstream_object(op, a, b):
return BlockStream((op(i, b) for i in a._iterator), a.nblock)
# Binary operators (object, BlockStream)
def _binary_op_object_blockstream(op, a, b):
return BlockStream((op(a, i) for i in b._iterator), b.nblock)
# Binary operators (Stream, BlockStream)
def _binary_op_stream_blockstream(op, a, b):
return op(a, b.samples())
#return Stream(map(op, a._iterator, b._iterator))
# Binary operators (BlockStream, Stream)
def _binary_op_blockstream_stream(op, a, b):
return op(a.samples(), b)
#return BlockStream(map(op, a._iterator, b._iterator), a.nblock)
for op in streaming.operators._BINARY_OPERATORS:
# Get the dispatcher for this operation
D = getattr(streaming.operators, op)
# And add the specific implementations
D.add((Stream, Stream), partial(_binary_op_stream_stream, D))
D.add((BlockStream, BlockStream), partial(_binary_op_blockstream_blockstream, D))
D.add((AbstractStream, Stream), partial(_binary_op_abstractstream_stream, D))
D.add((AbstractStream, BlockStream), partial(_binary_op_abstractstream_blockstream, D))
D.add((Stream, object), partial(_binary_op_stream_object, D))
D.add((object, Stream), partial(_binary_op_object_stream, D))
D.add((BlockStream, object), partial(_binary_op_blockstream_object, D))
D.add((object, BlockStream), partial(_binary_op_object_blockstream, D))
D.add((Stream, BlockStream), partial(_binary_op_stream_blockstream, D))
D.add((BlockStream, Stream), partial(_binary_op_blockstream_stream, D))
# Itertools
@tee.register(Stream)
def _(iterable, n=2):
return tuple(Stream(it) for it in itertools.tee(iterable, n))
@cycle.register(Stream)
def _(iterable):
return Stream(itertools.cycle(iterable))
@toarray.register(Stream)
def _(iterable):
return np.array(list(iterable))
@tee.register(BlockStream)
def _(iterable, n=2):
return tuple(BlockStream(it, iterable.nblock) for it in itertools.tee(iterable, n))
@cycle.register(BlockStream)
def _(iterable):
return BlockStream(itertools.cycle(iterable), nblock=iterable.nblock)
@toarray.register(BlockStream)
def _(iterable):
return iterable.samples().toarray()
# Other helpful functions
[docs]def count(start=0, step=1):
"""Count.
.. seealso:: :func:`itertools.count`
"""
return Stream(itertools.count(start=start, step=step))
@repeat_each.register(Stream)
def _(iterable, n):
return Stream(repeat_each(iterable._iterator, n))
@repeat_each.register(BlockStream)
def _(iterable, n):
return repeat_each(iterable.samples(), n).blocks(iterable.nblock)
__all__ = ['Stream', 'BlockStream', 'count']