def decorator(target):
"""A signature-matching decorator factory."""
def decorate(fn):
if not inspect.isfunction(fn):
raise Exception("not a decoratable function")
spec = compat.inspect_getfullargspec(fn)
names = tuple(spec[0]) + spec[1:3] + (fn.__name__,)
targ_name, fn_name = _unique_symbols(names, 'target', 'fn')
metadata = dict(target=targ_name, fn=fn_name)
metadata.update(format_argspec_plus(spec, grouped=False))
metadata['name'] = fn.__name__
code = """\
def %(name)s(%(args)s):
return %(target)s(%(fn)s, %(apply_kw)s)
""" % metadata
decorated = _exec_code_in_env(code,
{targ_name: target, fn_name: fn},
fn.__name__)
decorated.__defaults__ = getattr(fn, 'im_func', fn).__defaults__
decorated.__wrapped__ = fn
return update_wrapper(decorated, fn)
return update_wrapper(decorate, target)
python类update_wrapper()的实例源码
def application(cls, f):
"""Decorate a function as responder that accepts the request as first
argument. This works like the :func:`responder` decorator but the
function is passed the request object as first argument and the
request object will be closed automatically::
@Request.application
def my_wsgi_app(request):
return Response('Hello World!')
:param f: the WSGI callable to decorate
:return: a new WSGI callable
"""
#: return a callable that wraps the -2nd argument with the request
#: and calls the function with all the arguments up to that one and
#: the request. The return value is then called with the latest
#: two arguments. This makes it possible to use this decorator for
#: both methods and standalone WSGI functions.
def application(*args):
request = cls(args[-2])
with request:
return f(*args[:-2] + (request,))(*args[-2:])
return update_wrapper(application, f)
def webhook(f):
@csrf_exempt
def new_f(request, *args, **kwargs):
data = json.loads(request.body) or {}
with Context.for_request(request, data) as context:
return f(request, context, data, *args, **kwargs)
return update_wrapper(new_f, f)
def with_context(f):
def new_f(request, *args, **kwargs):
with Context.for_request(request) as context:
return f(request, context, *args, **kwargs)
return update_wrapper(new_f, f)
def allow_frame(f):
def new_f(request, *args, **kwargs):
resp = f(request, *args, **kwargs)
# put something here so that sentry does not overwrite it
# with deny.
resp['X-Frame-Options'] = 'allow'
return resp
return update_wrapper(new_f, f)
def cors(f):
def new_f(request, *args, **kwargs):
origin = request.META.get('HTTP_ORIGIN')
resp = f(request, *args, **kwargs)
resp['Access-Control-Allow-Origin'] = origin
resp['Access-Control-Request-Method'] = 'GET, HEAD, OPTIONS'
resp['Access-Control-Allow-Headers'] = 'X-Requested-With'
resp['Access-Control-Allow-Credentials'] = 'true'
resp['Access-Control-Max-Age'] = '1728000'
return resp
return update_wrapper(new_f, f)
def __call__(self, fn):
def _preparer_wrapper(test_class_instance, **kwargs):
self.live_test = not isinstance(test_class_instance, ReplayableTest)
self.test_class_instance = test_class_instance
if self.live_test or test_class_instance.in_recording:
resource_name = self.random_name
if not self.live_test and isinstance(self, RecordingProcessor):
test_class_instance.recording_processors.append(self)
else:
resource_name = self.moniker
with self.override_disable_recording():
parameter_update = self.create_resource(
resource_name,
**kwargs
)
test_class_instance.addCleanup(
lambda: self.remove_resource_with_record_override(resource_name, **kwargs)
)
if parameter_update:
kwargs.update(parameter_update)
if not is_preparer_func(fn):
# the next function is the actual test function. the kwargs need to be trimmed so
# that parameters which are not required will not be passed to it.
args, _, kw, _ = inspect.getargspec(fn) # pylint: disable=deprecated-method
if kw is None:
args = set(args)
for key in [k for k in kwargs if k not in args]:
del kwargs[key]
fn(test_class_instance, **kwargs)
setattr(_preparer_wrapper, '__is_preparer', True)
functools.update_wrapper(_preparer_wrapper, fn)
return _preparer_wrapper
def __call__(self, fn):
import_deps = self.import_deps
spec = compat.inspect_getfullargspec(fn)
spec_zero = list(spec[0])
hasself = spec_zero[0] in ('self', 'cls')
for i in range(len(import_deps)):
spec[0][i + (1 if hasself else 0)] = "import_deps[%r]" % i
inner_spec = format_argspec_plus(spec, grouped=False)
for impname in import_deps:
del spec_zero[1 if hasself else 0]
spec[0][:] = spec_zero
outer_spec = format_argspec_plus(spec, grouped=False)
code = 'lambda %(args)s: fn(%(apply_kw)s)' % {
"args": outer_spec['args'],
"apply_kw": inner_spec['apply_kw']
}
decorated = eval(code, locals())
decorated.__defaults__ = getattr(fn, 'im_func', fn).__defaults__
return update_wrapper(decorated, fn)
def _wrap_w_kw(self, fn):
def wrap(*arg, **kw):
return fn(*arg)
return update_wrapper(wrap, fn)
def responder(f):
"""Marks a function as responder. Decorate a function with it and it
will automatically call the return value as WSGI application.
Example::
@responder
def application(environ, start_response):
return Response('Hello World!')
"""
return update_wrapper(lambda *a: f(*a)(*a[-2:]), f)
def middleware(self, func):
"""Like `make_middleware` but for decorating functions.
Example usage::
@manager.middleware
def application(environ, start_response):
...
The difference to `make_middleware` is that the function passed
will have all the arguments copied from the inner application
(name, docstring, module).
"""
return update_wrapper(self.make_middleware(func), func)
def native_string_result(func):
def wrapper(*args, **kwargs):
return func(*args, **kwargs).encode('utf-8')
return functools.update_wrapper(wrapper, func)
def emits_module_deprecation_warning(f):
def new_f(self, *args, **kwargs):
with catch_warnings() as log:
f(self, *args, **kwargs)
self.assert_true(log, 'expected deprecation warning')
for entry in log:
self.assert_in('Modules are deprecated', str(entry['message']))
return update_wrapper(new_f, f)
def setupmethod(f):
"""Wraps a method so that it performs a check in debug mode if the
first request was already handled.
"""
def wrapper_func(self, *args, **kwargs):
if self.debug and self._got_first_request:
raise AssertionError('A setup function was called after the '
'first request was handled. This usually indicates a bug '
'in the application where a module was not imported '
'and decorators or other functionality was called too late.\n'
'To fix this make sure to import all your view modules, '
'database models and everything related at a central place '
'before the application starts serving requests.')
return f(self, *args, **kwargs)
return update_wrapper(wrapper_func, f)
def record_once(self, func):
"""Works like :meth:`record` but wraps the function in another
function that will ensure the function is only called once. If the
blueprint is registered a second time on the application, the
function passed is not called.
"""
def wrapper(state):
if state.first_registration:
func(state)
return self.record(update_wrapper(wrapper, func))
def timing(func):
def wrapper(*args, **argd):
start_time = time.time()
ret = func(*args, **argd)
print 'function: %s used %.2f seconds' % (func.__name__, time.time()-start_time)
return ret
functools.update_wrapper(wrapper, func)
return wrapper
def cached(func):
cache = {}
def template(*args): #: template is wrapper; func is wrapped
key = (func, )+args
try:
ret = cache[key]
except KeyError:
ret = func(*args)
cache[key] = ret
else:
pass
return ret
functools.update_wrapper(template, func)
return template
def crossdomain(origin=None, methods=None, headers=None, max_age=21600, attach_to_all=True, automatic_options=True):
if methods is not None:
methods = ', '.join(sorted(x.upper() for x in methods))
if headers is not None and not isinstance(headers, list):
headers = ', '.join(x.upper() for x in headers)
if not isinstance(origin, list):
origin = ', '.join(origin)
if isinstance(max_age, timedelta):
max_age = max_age.total_seconds()
def get_methods():
if methods is not None:
return methods
options_resp = current_app.make_default_options_response()
return options_resp.headers['allow']
def decorator(f):
def wrapped_function(*args, **kwargs):
if automatic_options and request.method == 'OPTIONS':
resp = current_app.make_default_options_response()
else:
resp = make_response(f(*args, **kwargs))
if not attach_to_all and request.method != 'OPTIONS':
return resp
h = resp.headers
h['Access-Control-Allow-Origin'] = origin
h['Access-Control-Allow-Methods'] = get_methods()
h['Access-Control-Max-Age'] = str(max_age)
h['Access-Control-Allow-Credentials'] = 'true'
h['Access-Control-Allow-Headers'] = "Origin, X-Requested-With, Content-Type, Accept, Authorization"
if headers is not None:
h['Access-Control-Allow-Headers'] = headers
return resp
f.provide_automatic_options = False
return update_wrapper(wrapped_function, f)
return decorator
def ratelimit(limit, per, send_x_headers=True,
scope_func=lambda: request.remote_addr,
key_func=lambda: request.endpoint,
path=lambda: request.path):
"""
Decorator for limiting the access to a route.
Returns the function if within the limit, otherwise TooManyRequests error
"""
def decorator(f):
@wraps(f)
def rate_limited(*args, **kwargs):
try:
key = 'rate-limit/%s/%s/' % (key_func(), scope_func())
rlimit = RateLimit(key, limit, per, send_x_headers)
g._view_rate_limit = rlimit
#if over_limit is not None and rlimit.over_limit:
if rlimit.over_limit:
raise TooManyRequests
return f(*args, **kwargs)
except Exception as e:
return error.format_exception(e, target=path(),
action=f.__name__)
return update_wrapper(rate_limited, f)
return decorator
def pass_context(f):
"""Marks a callback as wanting to receive the current context
object as first argument.
"""
def new_func(*args, **kwargs):
return f(get_current_context(), *args, **kwargs)
return update_wrapper(new_func, f)