In [1]:
from whatever.callables import (
    SetCallable, TupleCallable, ListCallable, DictCallable, Dispatch
)
import builtins
from joblib import Parallel, delayed
import operator 
import toolz.curried
from toolz.curried import (
    complement, compose, filter, 
    first, identity, juxt, map, partial, 
    peek, pipe, merge, last, second, valfilter, keyfilter
)
from typing import Any, Callable, Iterable

__all__ = ['chain', 'Chain', '_x', '__x', '__p', '_this', 'compose']

In [2]:
def evaluate(args, kwargs, fn):
    """Evaluates `fn` with the `args` and `kwargs` as unique arguments.
    """
    return fn(*args, **kwargs)

class ComposerBase(object):
    def __init__(
        self, **kwargs
    ):
        self.attrs = pipe(
            self.imports
            , reversed,
            map(vars), merge, keyfilter(
                compose(str.islower, first),
            ),valfilter(callable),
        )
        self.attrs.update(
            
        )
    
class DefaultComposer(ComposerBase):
    imports = [
        toolz.curried, builtins, operator
    ]

    def item(self, item):
        return item
    
    def attr(self, item):
        return self.attrs.get(item)
    
    def call(self, tokens, *args, **kwargs):
        tokens[-1][1:] = args, kwargs
        return tokens
    
    def composer(self, tokens):
        return compose(*pipe(
            tokens, reversed, filter(first), map(
                lambda arg: partial(arg[0], *arg[1], **arg[2]) 
                    if any(arg[1:]) else arg[0]
            ), list
        ))

In [3]:
class ChainBase(object):
    
    def compute(self, fn, *args, **kwargs):                
        # If any new arguments have been supplied then use them        
        return fn(*args, **kwargs)
    
    def __dir__(self):
        return super().__dir__() + self._dir
    
    def _tokenize(self, composer, attr):
        attr = composer(attr)
        if (
            not isinstance(attr, Callable) and 
            isinstance(attr, Iterable) and 
            isinstance(peek(attr), Iterable)
        ):
            return attr
        return [[attr, [], ()]]

In [4]:
class Chain(ChainBase): 
    _composer = DefaultComposer()
    _dir = []
    
    def __init__(self, *args, **kwargs):        
        # Tokens record function, arguments, and keywork arguments.
        self._tokens = []
        
        # Default context to evaluate the chain.
        self._args, self._kwargs = args, kwargs
                
    def compute(self, *args, **kwargs)->[Any, None]:
        """Compose and evaluate the function.
        """
        return super().compute(
            self.compose, *args, **kwargs
        )
            
    def __getattr__(self, attr):
        """Apply the attribute getter
        """
        self._tokens.extend(
            self._tokenize(self._composer.attr, attr)
        )
        return self
    
    def __getitem__(self, item):
        """Any function in the item can be tokenized.
        """
        self._tokens.extend(
            self._tokenize(self._composer.item, item)
        )

        return self
    
    def __call__(self, *args, **kwargs):
        self._tokens = self._composer.call(
            self._tokens, *args, **kwargs
        )
        return self
    
    def copy(self, klass=None):
        """Create a new instance of the current chain.
        """
        chain = (
            klass if klass else self.__class__
        )(*self._args, **self._kwargs)
        chain._tokens = self._tokens.copy()
        return chain
    
    @property
    def compose(self):
        return self._composer.composer(self._tokens)
    
    def __dir__(self):
        return super().__dir__() \
            + list(self._composer.attrs.keys())

In [5]:
class chain(Chain):
    def __repr__(self):
        if self._args or self._kwargs:
            return self.compute(
                *self._args, **self._kwargs,
            ).__repr__()
        func = self.compose
        return '\n'.join([
            str(func.first),
            func.funcs.__repr__(),
        ])

In [6]:
class SugarComposer(DefaultComposer):    
    multiple_dispatch = Dispatch([
        [Callable, identity],
        [set, SetCallable],
        [list, ListCallable],                
        [tuple, TupleCallable],                                
        [dict, DictCallable],
        [Any, identity],])
    
    def item(self, item):
        return self.multiple_dispatch(item)    
    
    def call(self, tokens, *args, **kwargs):
        if pipe(tokens, last, first) is map:
            if kwargs and not args: 
                args = [kwargs]
            return super().call(
                tokens, pipe(
                    args, first, self.item,
                ),
            )
        return super().call(tokens, *args, **kwargs)

In [7]:
class LiterateAPI(chain):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        _len = pipe(
            self.__class__.__name__,
            map(str.isalnum),
            list,
            lambda x: x.index(True),
        )
        setattr( self, '_'*_len, self.compute)

In [8]:
def juxtapose(func, x): 
    return juxt(*func)(x)

class _x(LiterateAPI):
    """Initialize a chain delimited with single underscores.
    """
    _composer = SugarComposer()
        
    def __or__(self, f):
        """Extend the current chain.
        """
        return self.copy()[f]

    def __gt__(self, f)->[compose, Any]:
        """Extend and evaluate the chain.
        """
        if f is compose:
            # return toolz.compose
            return self.compose
        
        return self.copy()[f].compute()
    
    def compute(self, *args, **kwargs):
        if not(args or kwargs):
            args, kwargs = self._args, self._kwargs
        return super().compute(*args, **kwargs)

    def __mul__(self, f):
        """Apply a map function.
        """
        return self.copy().map(
            self._composer.item(f)
        )
        
    def __add__(self, f):
        """Filter values that are true.
        """
        return self.copy().filter(
            self._composer.item(f)
        )

In [9]:
class __x(_x): 
    def _(self, *args, **kwargs):
        raise AttributeError(
            "Compose this function using a dunder - __"
        )

In [10]:
def getitem(item, obj): 
    return obj[item]

def getattr_(item, obj):
    return getattr(obj, item)

class ThisComposer(DefaultComposer): 
    def item(self, item):
        return [[getitem, [item], {}]]
    
    def attr(self, item):
        return [[getattr_, [item], {}]]
    
    def call(self, tokens, *args, **kwargs):
        """Add args and kwargs to the tokens.
        """
        tokens.append([evaluate, [args, kwargs], {}])
        return tokens

In [11]:
class _this(_x): 
    """A chain object to access attributes and items.   Converts to 
    a chain when copied
    """
    _composer = ThisComposer()
    
    def __init__(self, arg=None,):
        """Accepts one input argument.
        """
        super().__init__(arg)
    
    @property
    def __dir__(self): 
        return super().__dir__() + pipe(self._args, first).__dir__
    
    def copy(self, klass=_x):
        """A new chain beginning with the current chain tokens and argument.
        """
        chain = super().copy()
        new_chain = klass(chain._args[0])
        new_chain._tokens = [[
                chain.compose, [], {},
            ]]
        return new_chain
    
    def __repr__(self):
        self._tokens = pipe(
            self._tokens, filter(
                compose(complement(
                    lambda s: s.startswith('_ipython') or
                        s.startswith('_repr') if isinstance(s, str) else s,
                ), first, second,)
            ), list
        )
        return super().__repr__()

In [12]:
class ParallelComposer(SugarComposer):
    def __init__(self, n_jobs=4):
        self.n_jobs=n_jobs
    
    def composer(self, tokens, **kwargs):
        rekey=[]
        for i, token in enumerate(tokens):
            token = [token]
            if first(token[0]) is map:
                token[0][1] = [
                    delayed(token[0][1][0])
                ]
                token.append([
                        Parallel(n_jobs=self.n_jobs), [], {}
                    ])
            rekey.extend(token)
        return super().composer(rekey)

In [13]:
class __p(__x):
    def __init__(self, *args, n_jobs=1, **kwargs):
        self._composer = ParallelComposer(
            n_jobs=n_jobs
        )
        super().__init__(*args, **kwargs)