In [2]:
from abc import ABC, abstractmethod
from collections import namedtuple
import functools
import inspect
import re
In [3]:
class CommandBus(ABC):
@abstractmethod
def register(self, cmd, handler, *a, **k):
pass
@abstractmethod
def execute(self, cmd, *a, **k):
pass
In [4]:
# this is bottle-inject with the doc strings removed because scrolls bars
# https://github.com/bottlepy/bottle-inject
class InjectError(RuntimeError):
pass
class _InjectionPoint(object):
""" The object returned by :func:`inject`. """
def __init__(self, name, config=None, implicit=False):
self.name = name
self.config = config or {}
self.implicit = implicit
def __eq__(self, other):
if isinstance(other, _InjectionPoint):
return self.__dict__ == other.__dict__
return False
class _ProviderCache(dict):
""" A self-filling cache for :meth:`Injector.resolve` results. """
def __init__(self, injector):
super(_ProviderCache, self).__init__()
self.injector = injector
def __missing__(self, func):
self[func] = value = list(self.injector._resolve(func).items())
return value
def _unwrap(func):
if inspect.isclass(func):
func = func.__init__
while hasattr(func, '__wrapped__'):
func = func.__wrapped__
return func
def _make_null_resolver(name, provider):
msg = "The dependency provider for %r does not accept configuration (it is not a resolver)." % name
def null_resolver(*a, **ka):
if a or ka:
raise InjectError(msg)
return provider
return null_resolver
class Injector(object):
def __init__(self):
self.__cache = _ProviderCache(self)
self._resolvers = {}
self._never_inject = set(('self', ))
def add_value(self, name, value, alias=()):
""" Register a dependency value.
The dependency value is re-used for every injection and treated as a singleton.
:param name: Name of the injection point.
:param value: The singleton to provide.
:param alias: A list of alternative injection points.
:return: None
"""
self.add_provider(name, lambda: value, alias=alias)
def add_provider(self, name, func, alias=()):
self.add_resolver(name, _make_null_resolver(name, func), alias=alias)
def add_resolver(self, name, func, alias=()):
self._resolvers[name] = func
for name in alias:
self._resolvers[name] = func
self.__cache.clear()
def remove(self, name):
if self._resolvers.pop(name):
self.__cache.clear()
def provider(self, name, alias=()):
assert isinstance(name, str)
def decorator(func):
self.add_provider(name, func, alias=alias)
return func
return decorator
def resolver(self, name, alias=()):
def decorator(func):
self.add_resolver(name, func, alias=alias)
return func
return decorator
def inspect(self, func):
func = _unwrap(func)
if py32:
args, varargs, varkw, defaults, kwonlyargs, kwonlydefaults, annotations = inspect.getfullargspec(func)
else:
args, varargs, keywords, defaults = inspect.getargspec(func)
kwonlyargs, kwonlydefaults, annotations = [], {}, {}
defaults = defaults or ()
kwonlydefaults = kwonlydefaults or {}
injection_points = {}
# Positional arguments without default value are potential injection points,
# but marked as 'implicit'.
for arg in args[:len(args) - len(defaults or [])]:
if arg not in self._never_inject:
injection_points[arg] = _InjectionPoint(arg, implicit=True)
for arg, value in zip(args[::-1], defaults[::-1]):
if isinstance(value, _InjectionPoint):
injection_points[arg] = value
for arg, value in kwonlydefaults.items():
if isinstance(value, _InjectionPoint):
injection_points[arg] = value
for arg, value in annotations.items():
if isinstance(value, _InjectionPoint):
injection_points[arg] = value
return injection_points
def _resolve(self, func):
results = {}
for arg, ip in self.inspect(func).items():
results[arg] = self._prime(ip)
return results
def _prime(self, ip):
try:
provider_resolver = self._resolvers[ip.name]
except KeyError:
err = InjectError("Could not resolve provider for injection point %r" % ip.name)
if not ip.implicit:
raise err
def fail_if_injected():
raise err
return fail_if_injected
provider = self.call_inject(provider_resolver, **ip.config)
return self.wrap(provider)
def call_inject(self, func, **ka):
for key, producer in self.__cache[func]:
if key not in ka:
ka[key] = producer()
return func(**ka)
def wrap(self, func):
cache = self.__cache # Avoid dot lookup in hot path
# Skip wrapping for functions with no injection points
if not self.inspect(func):
return func
@functools.wraps(func)
def wrapper(**ka):
# PERF: Inlined call_inject call. Keep in sync with the implementation above.
for key, producer in cache[func]:
if key not in ka:
ka[key] = producer()
return func(**ka)
wrapper.__injector__ = self
return wrapper
def inject(name, **kwargs):
return _InjectionPoint(name, config=kwargs)
In [26]:
class DI(ABC):
@abstractmethod
def register(self, provider, names):
pass
@abstractmethod
def make(self, wanted, **kwargs):
pass
class MyDI(DI):
def __init__(self, container):
self._container = container
def register(self, provider, name, *aliases):
self._container.add_provider(name, provider, aliases)
def make(self, wanted, **kwargs):
return self._container.call_inject(wanted, **kwargs)
In [27]:
injector = Injector()
syringe = MyDI(injector)
In [28]:
def get_type(obj):
if isinstance(obj, type):
return obj
return type(obj)
class NoHandler(Exception):
pass
class PermissionDenied(Exception):
pass
class DICB(CommandBus):
def __init__(self, dic, inflector):
self._dic = dic
self._cmd_map = {}
self._inflector = inflector
def register(self, cmd, handler, *a, **k):
self._cmd_map[get_type(cmd)] = handler
def execute(self, cmd, *a, **k):
try:
handler = self._dic.make(self._cmd_map[get_type(cmd)])
return self._inflector(handler, cmd)
except (KeyError, InjectError):
msg = "No valid handler found for: {!s}".format(cmd)
raise NoHandler(msg)
class PermissionedCB(CommandBus):
def __init__(self, next, throws=PermissionDenied):
self._next = next
self._cmd_perm_map = {}
self._throws = PermissionDenied
def register(self, cmd, handler, permissions, *a, **k):
self._cmd_perm_map[get_type(cmd)] = permissions
self._next.register(cmd, handler, *a, **k)
def execute(self, cmd, *a, **k):
if not self._resolve_permissions(cmd, *a, **k):
raise self._throws
return self._next.execute(cmd, *a, **k)
def _resolve_permissions(self, cmd, *a, **k):
perms = self._cmd_perm_map[get_type(cmd)]
return all(p(cmd, *a, **k) for p in perms)
In [29]:
first_cap_re = re.compile('(.)([A-Z][a-z]+)')
all_cap_re = re.compile('([a-z0-9])([A-Z])')
def underscorize(name):
s1 = first_cap_re.sub(r'\1_\2', name)
return all_cap_re.sub(r'\1_\2', s1).lower()
def camel_to_underscore_inflector(handler, command):
cmd_type = get_type(command)
name = underscorize(cmd_type.__name__)
return getattr(handler, name)(**command._asdict())
In [30]:
c = PermissionedCB(DICB(syringe, camel_to_underscore_inflector))
In [31]:
class WantsJeff(object):
def __init__(self, jeff):
self._jeff = jeff
def list_jeffs(self, *a, **k):
print("Got: {} {}".format(a, k))
return self._jeff
In [32]:
def jeff():
return 'jeff'
syringe.register(jeff, 'jeff')
In [33]:
ListJeffs = namedtuple('ListJeffs', ['one', 'two'])
In [34]:
def BelongsToGroup(group):
def permission(cmd, request, *a, **k):
return group in request.user.groups
return permission
def ReadOnly(cmd, request, *a, **k):
if not request.method.lower() in {'get', 'head', 'options'}:
raise PermissionDenied("Not a readonly method")
In [35]:
c.register(ListJeffs, WantsJeff, [BelongsToGroup('jess'), ReadOnly])
In [36]:
class dotteddict(dict):
def __getattr__(self, attr):
return super().__getitem__(attr)
def __setattr__(self, attr, value):
super().__setitem__(attr, value)
In [37]:
request = dotteddict()
request.method = 'POST'
request.user = dotteddict()
request.user.groups = set(['fred'])
In [38]:
c.execute(ListJeffs(1, 2), request)
In [ ]: