1#!/usr/bin/env python3
  2
  3"""Utility decorator objects."""
  4
  5import os
  6import sys
  7import logging
  8import functools
  9from functools import partial
 10from datetime import datetime, timedelta
 11
 12from collections import OrderedDict
 13
 14# Do not import anything from chart here as this module is imported
 15# during early startup
 16
 17class memoized_noargs:
 18    """Faster memoized wrapper for functions with no arguments."""
 19
 20    def __init__(self, func):
 21        self.func = func
 22        self.called = False
 23        self.value = None
 24
 25    def __call__(self, *args):
 26        if self.called:
 27            return self.value
 28
 29        self.value = self.func()
 30        self.called = True
 31        return self.value
 32
 33    def __repr__(self):
 34        """Return the function's docstring."""
 35        return self.func.__doc__
 36
 37
 38class memoized:
 39    """Generic decorator that caches a function's return value each time it is called.
 40
 41    If called later with the same arguments, the cached value is returned and
 42    not re-evaluated.
 43    """
 44
 45    # you can make a version of memoized that works with optional arguments
 46    # by cPickling the dictionary
 47
 48    def __init__(self, func):
 49        self.func = func
 50        self.cache = {}
 51
 52    def __call__(self, *args):
 53        try:
 54            return self.cache[args]
 55        except KeyError:
 56            self.cache[args] = value = self.func(*args)
 57            return value
 58        except TypeError:
 59            # uncachable -- for instance, passing a list as an argument.
 60            # Better to not cache than to blow up entirely.
 61            return self.func(*args)
 62
 63    def __repr__(self):
 64        """Return the function's docstring."""
 65        return self.func.__doc__
 66
 67
 68class memoized_kwargs:
 69    """Remember a functions results. Works with functions that use kargs.
 70
 71    This may be slower than the original `memoized` call.
 72    """
 73
 74    # you can make a version of memoized that works with optional arguments
 75    # by cPickling the dictionary
 76
 77    def __init__(self, func):
 78        self.func = func
 79        self.cache = {}
 80
 81    def __call__(self, *args, **kwargs):
 82        cargs = (args, frozenset(sorted(kwargs.items())))
 83        try:
 84            return self.cache[cargs]
 85        except KeyError:
 86            self.cache[cargs] = value = self.func(*args, **kwargs)
 87            return value
 88
 89    def __repr__(self):
 90        """Return the function's docstring."""
 91        return self.func.__doc__
 92
 93
 94class memoized2:
 95    """Improved (but maybe slower) version of memoized that works with member functions."""
 96
 97    def __init__(self, func):
 98        self.func = func
 99        self.memoized = {}
100        self.method_cache = {}
101
102    def __call__(self, *args):
103        return self._cache_get(self.memoized,
104                               args,
105                               lambda: self.func(*args))
106
107    def __get__(self, obj, objtype):
108        return self._cache_get(self.method_cache,
109                               obj,
110                               lambda: self.__class__(functools.partial(self.func, obj)))
111
112    def _cache_get(self, cache, key, func):
113        """Return cached result, computing it if needed."""
114        try:
115            return cache[key]
116        except KeyError:
117            cache[key] = func()
118            return cache[key]
119
120
121# def memoized_named_class(cls):
122#     """Convert a regular class to have a memoized constructor.
123#     The constructor must accept a single hashable argument."""
124#     cache = {}
125#     def imp(name):
126#         """Lookup instance from cache."""
127#         result = cache.get(name)
128#         if result is not None:
129#             return result
130
131#         result = cls(name)
132#         cache[name] = result
133#         return result
134
135#     return imp
136
137
138class cached:
139    """Decorator that remembers the parameters and result of the previous call to the function."""
140
141    def __init__(self, func):
142        self.func = func
143        self.last_args = None
144        self.last_result = None
145
146    def __call__(self, *args):
147        if args == self.last_args:
148            return self.last_result
149
150        else:
151            self.last_args = args
152            self.last_result = self.func(*args)
153            return self.last_result
154            # return self.func(*args)
155
156    def __repr__(self):
157        """Return the function's docstring."""
158        return self.func.__doc__
159
160
161def lra_cache(maxsize=100):
162    """Least-recently-added cache decorator.
163    Arguments to the cached function must be hashable.
164    Cache performance statistics stored in f.hits and f.misses.
165    http://en.wikipedia.org/wiki/Cache_algorithms#Least_Recently_Used.
166    """
167
168    def _decorating_function(user_function):  # (missing docstring) pylint: disable=C0111
169        cache = OrderedDict()
170
171        @functools.wraps(user_function)
172        def wrapper(*args, **kwds):  # (missing docstring) pylint: disable=C0111
173            key = args
174            if kwds:
175                key += tuple(sorted(kwds.items()))
176
177            try:
178                result = cache[key]
179            except KeyError:
180                result = user_function(*args, **kwds)
181                if len(cache) >= maxsize:
182                    cache.popitem(last=False)  # purge least recently used cache entry
183
184                cache[key] = result  # record recent use of this key
185
186            # print cache
187            return result
188
189        return wrapper
190
191    return _decorating_function
192
193
194def memoized_expires(duration=timedelta(hours=24)):
195    """Like memoized but values are forgotten after `duration`."""
196
197    def _decorating_function(user_function):  # (missing docstring) pylint: disable=C0111
198        cache = {}
199
200        @functools.wraps(user_function)
201        def wrapper(*args, **kwds):  # (missing docstring) pylint: disable=C0111
202            key = args
203            if kwds:
204                key += tuple(sorted(kwds.items()))
205
206            result = cache.get(key)
207            if result is None or datetime.utcnow() > result[0]:
208                result = (datetime.utcnow() + duration,
209                          user_function(*args, **kwds))
210
211                cache[key] = result  # record recent use of this key
212
213            return result[1]
214
215        return wrapper
216
217    return _decorating_function
218
219
220def format_arg_value(arg_val):
221    """Return a string representing a (name, value) pair.
222
223    >>> format_arg_value(('x', (1, 2, 3)))
224    'x=(1, 2, 3)'
225    """
226    arg, val = arg_val
227    return "%(arg)s=%(val)r" % dict(arg=arg, val=val)
228
229
230def echo(fn, write=logging.debug):
231    """Echo calls to a function.
232
233    Returns a decorated version of the input function which "echoes" calls
234    made to it by writing out the function's name and the arguments it was
235    called with.
236    """
237    # Unpack function's arg count, arg names, arg defaults
238    code = fn.__code__
239    argcount = code.co_argcount
240    argnames = code.co_varnames[:argcount]
241    fn_defaults = fn.__defaults__ or list()
242    argdefs = dict(list(zip(argnames[-len(fn_defaults):], fn_defaults)))
243
244    @functools.wraps(fn)
245    def wrapped(*v, **k):
246        """Collect function arguments by chaining together positional,
247        # defaulted, extra positional and keyword arguments."""
248        positional = [format_arg_value(f) for f in zip(argnames, v)]
249        defaulted = [format_arg_value((a, argdefs[a]))
250                     for a in argnames[len(v):] if a not in k]
251        nameless = [repr(i) for i in v[argcount:]]
252        keyword = [format_arg_value(i) for i in list(k.items())]
253        args = positional + defaulted + nameless + keyword
254        res = fn(*v, **k)
255        write("%(file)s:%(line)d %(fnname)s(%(args)s) returns %(res)s"
256              % {'fnname': fn.__name__,
257                 'args': ", ".join(args),
258                 'res': res,
259                 'file': os.path.basename(sys._getframe(1).f_code.co_filename),
260                 'line': sys._getframe(1).f_lineno})
261        return res
262
263    return wrapped
264
265
266def trace(func):
267    """Log the parameters and return value of a function. Does not work."""
268    def imp(func, *args, **kwargs):
269        """Implementation of trace()."""
270        logging.debug('Calling {func} with {args}, {kwargs}'.format(
271            func=func.__name__, args=args, kwargs=kwargs))
272        res = func(*args, **kwargs)
273        logging.debug('{func} returning {res}'.format(func=func.__name__, res=res))
274        return res
275
276    return partial(imp, func)
277
278# def trace(func):
279    # import decorator
280    # return decorator.decorator(_trace, func)