Source code for streaming.abstractstream

"""
Abstract Streams
================

The module :mod:`streaming.abtractstream` contains an abstract base class for streams.
Furthermore, additional signatures are added to allow the operators defined in :mod:`streaming.operators` to work with `class:`streaming.abstractstream.AbstractStream`.

"""

import abc
import collections
from functools import partial
import itertools
from multipledispatch.conflict import AmbiguityWarning
import numpy as np
import warnings
warnings.filterwarnings("ignore", category=AmbiguityWarning)
import streaming
from streaming.operators import *
from streaming.itertools import *

import cytoolz
#from streaming.operators import _basic_objects

## Add basic operators to module
import sys
_thismodule = sys.modules[__name__]

#def _binary_operator(operator):
    #"""Apply `operator` to binary inputs.
    #"""
    #def op(x, y):
        #if isinstance(y, collections.Stream):
            #yield from map(operator, x, y)
        #else: # Consider it a constant
            #yield from map(lambda i: operator(i, y) for i in x)
    #return op

#def _reversed_binary_operator(operator):
    #"""Apply `operator` to binary inputs.
    #"""
    #def op(y, x):
        #if isinstance(y, collections.Stream):
            #yield from map(operator, x, y)
        #else: # Consider it a constant
            #yield from map(lambda i: operator(i, y) for i in x)
    #return op


###def _unary_operator(operator):
    ###"""Apply `operator` to unary input.
    ###"""
    ###def op(x):
        ###yield from map(operator, x)
    ###return op


#class MetaStream(abc.ABCMeta):

    #def __new__(cls, name, bases, attrs):

        ## Add binary operators
        #for op in streaming.operators._BINARY_OPERATORS:
            #attrs['__'+op+'__'] = getattr(streaming.operators, op)
            ##attrs['__r'+op+'__'] = _reversed_binary_operator(getattr(operator, op))
            ##attrs['__'+op+'__'] = _binary_operator(getattr(operator, op))
            ##attrs['__r'+op+'__'] = _reversed_binary_operator(getattr(operator, op))
            ##setattr(cls, '__'+op+'__', getattr(streaming.operators, op))
        #attrs['__div__'] = attrs['__truediv__']

        ### Add unary operators
        ##for operator in _UNARY_OPERATORS:
            ##attrs['__'+operator+'__'] = _unary_operator(operator)

        #return super().__new__(cls, name, bases, attrs)


def _wrapped_binary_op(op):
    def wrapped(a, b):
        return op(a, b)
    return wrapped

def _reverse_wrapped_binary_op(op):
    """Swap inputs."""
    def reverser(a, b):
        #print("Op: {}, Left: {}, Right {}".format(op, a, b))
        def wrapped(x, y):
            #print("Reversed, now Op: {}, Left: {}, Right {}".format(op, x, y))
            return op(x, y)
        return wrapped(b, a)
    return reverser


class Operators(abc.ABCMeta):

    def __new__(cls, name, bases, attrs):

        cls = super().__new__(cls, name, bases, attrs)

        import streaming

        # Add binary operators
        for op in streaming.operators._BINARY_OPERATORS:
            setattr(cls, '__'+op+'__', _wrapped_binary_op(getattr(streaming.operators, op)))
        for op in streaming.operators._BINARY_OPERATORS:#_WITH_REVERSE:
            setattr(cls, '__r'+op+'__', _reverse_wrapped_binary_op(getattr(streaming.operators, op)))
        return cls


[docs]class AbstractStream(collections.Iterator, metaclass=Operators): """Abstract stream.""" __array_priority__ = 1000 """Array priority is required to force usage of __radd__ in case left-side object is an `np.ndarray`.""" __numpy_ufunc__ = None """Disable np ufuncs entirely, force usage of __rop__.""" def __init__(self, iterator): if not isinstance(iterator, collections.abc.Iterable): raise ValueError("Iterable required, iterator preferred.") elif isinstance(iterator, collections.abc.Iterable): iterator = iter(iterator) # Try and find the actual iterator # Could move this into elif obj = iterator while hasattr(obj, '_iterator'): obj = obj._iterator self._iterator = obj super().__init__() ##def __bool__(self): #return True def __iter__(self): yield from self._iterator def __next__(self): return next(self._iterator) #def _fastmap(self, func): #return self.map(func) @abc.abstractproperty def nblock(self): """Amount of samples per block.""" pass @abc.abstractmethod def _construct(self, iterable): """Construct instance of this type from given `iterable`""" pass @abc.abstractmethod
[docs] def blocks(self): """Stream with blocks. :rtype: :class:`BlockStream` """ pass
[docs] def cos(self): """Cosine.""" return self.map(np.cos)
#@abc.abstractmethod
[docs] def copy(self): """Make a copy of the stream.""" self._iterator, iterator = itertools.tee(self._iterator) return self._construct(iterator)
[docs] def cycle(self): """Cycle the stream. .. seealso:: :func:`itertools.cycle` """ return streaming.itertools.cycle(self)
@abc.abstractmethod
[docs] def drop(self, n): """Drop the first `n` items.""" return self._construct(cytoolz.drop(n, self))
[docs] def exp(self): """Exponential.""" return self.map(np.exp)
@abc.abstractmethod
[docs] def map(self, func): """Map `func` to each sample in `Stream`. """ return AbstractStream(map(func, self._iterator))
#stream = self.samples() #return type(stream)(map(func, stream)) #@abc.abstractmethod #def mapblock(self, func): #pass
[docs] def nsamples(self): """Amount of samples in stream. .. warning:: This consumes the stream. """ return cytoolz.count(self.samples())
[docs] def peek(self): """Check the first item in the stream.""" first, self._iterator = cytoolz.peek(self._iterator) return first
[docs] def repeat_each(self, n): """Repeat each item `n` times before yielding the next. """ return repeat_each(self, n)
@abc.abstractmethod
[docs] def samples(self): """Stream with samples. :rtype: :class:`Stream` """ pass
[docs] def sin(self): """Sine.""" return self.map(np.sin)
[docs] def sqrt(self): """Square root.""" return self.map(np.sqrt)
@abc.abstractmethod
[docs] def take(self, n): """Take the first `n`.""" return self._construct(cytoolz.take(n, self._iterator))
[docs] def take_nth(self, n): """Take every `n`th.""" return self._construct(cytoolz.take_nth(n, self._iterator))
[docs] def tan(self): """Tangens.""" return self.map(np.tan)
[docs] def tee(self, n=2): """Split stream in `n` streams. .. seealso:: :func:`itertools.tee` """ return tee(self, n=n)
[docs] def toarray(self): """Convert to array. :rtype: :class:`np.ndarray` """ return toarray(self)
# Binary operators (AbstractStream, object) def _binary_op_abstractstream_object(op, a, b): return AbstractStream(op(i, b) for i in a._iterator) # Binary operators (object, AbstractStream) def _binary_op_object_abstractstream(op, a, b): return AbstractStream(op(a, i) for i in b._iterator) # Binary operators (AbstractStream, AbstractStream) def _binary_op_abstractstream_abstractstream(op, a, b): return AbstractStream(map(op, a._iterator, b._iterator)) for op in streaming.operators._BINARY_OPERATORS: # Get the dispatcher for this operation D = getattr(streaming.operators, op) # And add the specific implementations D.add((AbstractStream, AbstractStream), partial(_binary_op_abstractstream_abstractstream, D)) D.add((AbstractStream, object), partial(_binary_op_abstractstream_object, D)) D.add((object, AbstractStream), partial(_binary_op_object_abstractstream, D)) # Itertools @tee.register(AbstractStream) def _(iterable, n=2): return tuple(AbstractStream(it) for it in itertools.tee(iterable, n)) @cycle.register(AbstractStream) def _(iterable): return AbstractStream(iterable)(itertools.cycle(iterable)) # Other helpful functions def count(start=0, step=1): return AbstractStream(itertools.count(start=start, step=step)) @repeat_each.register(AbstractStream) def _(iterable, n): return AbstractStream(repeat_each(iterable._iterator, n))