1#!/usr/bin/env python3
2
3"""Utility decorator objects."""
4
5import os
6import sys
7import logging
8import functools
9from functools import partial
10from datetime import datetime, timedelta
11
12from collections import OrderedDict
13
14# Do not import anything from chart here as this module is imported
15# during early startup
16
17class memoized_noargs:
18 """Faster memoized wrapper for functions with no arguments."""
19
20 def __init__(self, func):
21 self.func = func
22 self.called = False
23 self.value = None
24
25 def __call__(self, *args):
26 if self.called:
27 return self.value
28
29 self.value = self.func()
30 self.called = True
31 return self.value
32
33 def __repr__(self):
34 """Return the function's docstring."""
35 return self.func.__doc__
36
37
38class memoized:
39 """Generic decorator that caches a function's return value each time it is called.
40
41 If called later with the same arguments, the cached value is returned and
42 not re-evaluated.
43 """
44
45 # you can make a version of memoized that works with optional arguments
46 # by cPickling the dictionary
47
48 def __init__(self, func):
49 self.func = func
50 self.cache = {}
51
52 def __call__(self, *args):
53 try:
54 return self.cache[args]
55 except KeyError:
56 self.cache[args] = value = self.func(*args)
57 return value
58 except TypeError:
59 # uncachable -- for instance, passing a list as an argument.
60 # Better to not cache than to blow up entirely.
61 return self.func(*args)
62
63 def __repr__(self):
64 """Return the function's docstring."""
65 return self.func.__doc__
66
67
68class memoized_kwargs:
69 """Remember a functions results. Works with functions that use kargs.
70
71 This may be slower than the original `memoized` call.
72 """
73
74 # you can make a version of memoized that works with optional arguments
75 # by cPickling the dictionary
76
77 def __init__(self, func):
78 self.func = func
79 self.cache = {}
80
81 def __call__(self, *args, **kwargs):
82 cargs = (args, frozenset(sorted(kwargs.items())))
83 try:
84 return self.cache[cargs]
85 except KeyError:
86 self.cache[cargs] = value = self.func(*args, **kwargs)
87 return value
88
89 def __repr__(self):
90 """Return the function's docstring."""
91 return self.func.__doc__
92
93
94class memoized2:
95 """Improved (but maybe slower) version of memoized that works with member functions."""
96
97 def __init__(self, func):
98 self.func = func
99 self.memoized = {}
100 self.method_cache = {}
101
102 def __call__(self, *args):
103 return self._cache_get(self.memoized,
104 args,
105 lambda: self.func(*args))
106
107 def __get__(self, obj, objtype):
108 return self._cache_get(self.method_cache,
109 obj,
110 lambda: self.__class__(functools.partial(self.func, obj)))
111
112 def _cache_get(self, cache, key, func):
113 """Return cached result, computing it if needed."""
114 try:
115 return cache[key]
116 except KeyError:
117 cache[key] = func()
118 return cache[key]
119
120
121# def memoized_named_class(cls):
122# """Convert a regular class to have a memoized constructor.
123# The constructor must accept a single hashable argument."""
124# cache = {}
125# def imp(name):
126# """Lookup instance from cache."""
127# result = cache.get(name)
128# if result is not None:
129# return result
130
131# result = cls(name)
132# cache[name] = result
133# return result
134
135# return imp
136
137
138class cached:
139 """Decorator that remembers the parameters and result of the previous call to the function."""
140
141 def __init__(self, func):
142 self.func = func
143 self.last_args = None
144 self.last_result = None
145
146 def __call__(self, *args):
147 if args == self.last_args:
148 return self.last_result
149
150 else:
151 self.last_args = args
152 self.last_result = self.func(*args)
153 return self.last_result
154 # return self.func(*args)
155
156 def __repr__(self):
157 """Return the function's docstring."""
158 return self.func.__doc__
159
160
161def lra_cache(maxsize=100):
162 """Least-recently-added cache decorator.
163 Arguments to the cached function must be hashable.
164 Cache performance statistics stored in f.hits and f.misses.
165 http://en.wikipedia.org/wiki/Cache_algorithms#Least_Recently_Used.
166 """
167
168 def _decorating_function(user_function): # (missing docstring) pylint: disable=C0111
169 cache = OrderedDict()
170
171 @functools.wraps(user_function)
172 def wrapper(*args, **kwds): # (missing docstring) pylint: disable=C0111
173 key = args
174 if kwds:
175 key += tuple(sorted(kwds.items()))
176
177 try:
178 result = cache[key]
179 except KeyError:
180 result = user_function(*args, **kwds)
181 if len(cache) >= maxsize:
182 cache.popitem(last=False) # purge least recently used cache entry
183
184 cache[key] = result # record recent use of this key
185
186 # print cache
187 return result
188
189 return wrapper
190
191 return _decorating_function
192
193
194def memoized_expires(duration=timedelta(hours=24)):
195 """Like memoized but values are forgotten after `duration`."""
196
197 def _decorating_function(user_function): # (missing docstring) pylint: disable=C0111
198 cache = {}
199
200 @functools.wraps(user_function)
201 def wrapper(*args, **kwds): # (missing docstring) pylint: disable=C0111
202 key = args
203 if kwds:
204 key += tuple(sorted(kwds.items()))
205
206 result = cache.get(key)
207 if result is None or datetime.utcnow() > result[0]:
208 result = (datetime.utcnow() + duration,
209 user_function(*args, **kwds))
210
211 cache[key] = result # record recent use of this key
212
213 return result[1]
214
215 return wrapper
216
217 return _decorating_function
218
219
220def format_arg_value(arg_val):
221 """Return a string representing a (name, value) pair.
222
223 >>> format_arg_value(('x', (1, 2, 3)))
224 'x=(1, 2, 3)'
225 """
226 arg, val = arg_val
227 return "%(arg)s=%(val)r" % dict(arg=arg, val=val)
228
229
230def echo(fn, write=logging.debug):
231 """Echo calls to a function.
232
233 Returns a decorated version of the input function which "echoes" calls
234 made to it by writing out the function's name and the arguments it was
235 called with.
236 """
237 # Unpack function's arg count, arg names, arg defaults
238 code = fn.__code__
239 argcount = code.co_argcount
240 argnames = code.co_varnames[:argcount]
241 fn_defaults = fn.__defaults__ or list()
242 argdefs = dict(list(zip(argnames[-len(fn_defaults):], fn_defaults)))
243
244 @functools.wraps(fn)
245 def wrapped(*v, **k):
246 """Collect function arguments by chaining together positional,
247 # defaulted, extra positional and keyword arguments."""
248 positional = [format_arg_value(f) for f in zip(argnames, v)]
249 defaulted = [format_arg_value((a, argdefs[a]))
250 for a in argnames[len(v):] if a not in k]
251 nameless = [repr(i) for i in v[argcount:]]
252 keyword = [format_arg_value(i) for i in list(k.items())]
253 args = positional + defaulted + nameless + keyword
254 res = fn(*v, **k)
255 write("%(file)s:%(line)d %(fnname)s(%(args)s) returns %(res)s"
256 % {'fnname': fn.__name__,
257 'args': ", ".join(args),
258 'res': res,
259 'file': os.path.basename(sys._getframe(1).f_code.co_filename),
260 'line': sys._getframe(1).f_lineno})
261 return res
262
263 return wrapped
264
265
266def trace(func):
267 """Log the parameters and return value of a function. Does not work."""
268 def imp(func, *args, **kwargs):
269 """Implementation of trace()."""
270 logging.debug('Calling {func} with {args}, {kwargs}'.format(
271 func=func.__name__, args=args, kwargs=kwargs))
272 res = func(*args, **kwargs)
273 logging.debug('{func} returning {res}'.format(func=func.__name__, res=res))
274 return res
275
276 return partial(imp, func)
277
278# def trace(func):
279 # import decorator
280 # return decorator.decorator(_trace, func)