def verify_request_signature(func):
@wraps(func)
def decorated(*args, **kwargs):
signature = request.headers.get('x-hub-signature', None)
if signature:
elements = signature.split('=')
method = elements[0]
signature_hash = elements[1]
expected_hash = hmac.new(APP_SECRET, msg=request.get_data(), digestmod=method).hexdigest()
if signature_hash != expected_hash:
LOGGER.error('Signature was invalid')
return make_response('', 403)
else:
LOGGER.error('Could not validate the signature')
return func(*args, **kwargs)
return decorated
python类wraps()的实例源码
def logger(command):
"""
Add a logger to the decorated command.
"""
@wraps(command)
# pylint: disable=bad-whitespace
# pylint: disable=unused-argument
def wrapper(bot, update, **kwargs):
command( update, **kwargs)
if command.__name__ == 'unknown':
command.__name__ = _get_command_name(update.message.text)
message = LOG_TEMPLATE.format(user=update.user.first_name,
command=command.__name__)
logging.info(message)
return wrapper
def run_in_executor(f):
"""
A decorator to run the given method in the ThreadPoolExecutor.
"""
@wraps(f)
def new_f(self, *args, **kwargs):
if self.is_shutdown:
return
try:
future = self.executor.submit(f, self, *args, **kwargs)
future.add_done_callback(_future_completed)
except Exception:
log.exception("Failed to submit task to executor")
return new_f
def handle_bad_input_mc(f):
@wraps(f)
def handle(*args, **kwargs):
pre_type = args[0].pre_type
if pre_type == PreType.None:
return handle_bad_input(f)(*args, **kwargs)
EType = {
PreType.SimplePre : SimplePre.InvalidMcOperation,
PreType.SimplePreLAG : SimplePreLAG.InvalidMcOperation
}[pre_type]
Codes = {
PreType.SimplePre : SimplePre.McOperationErrorCode,
PreType.SimplePreLAG : SimplePreLAG.McOperationErrorCode
}[pre_type]
try:
return handle_bad_input(f)(*args, **kwargs)
except EType as e:
error = Codes._VALUES_TO_NAMES[e.code]
print "Invalid PRE operation (%s)" % error
return handle
def validate(validation_class, silent=not settings.DEBUG):
def decorator(fun):
@functools.wraps(fun)
def wrapper(*a, **kw):
if flask.request.method == 'GET':
p = {k: v for k, v in flask.request.values and flask.request.values.items() or {}}
else:
try:
p = flask.request.data and json.loads(flask.request.data.decode())
except Exception:
raise exceptions.WrongParametersException
if silent:
try:
validation_class(p)
except PyCombValidationError:
raise exceptions.WrongParametersException
else:
validation_class(p)
return fun(*a, params=p, **kw)
return wrapper
return decorator
def handle_errors(f):
"""A decorator that allows to ignore certain types of errors."""
@functools.wraps(f)
def wrapper(*args, **kwargs):
param_name = 'ignore_errors'
ignored_errors = kwargs.get(param_name, tuple())
if param_name in kwargs:
del kwargs[param_name]
try:
return f(*args, **kwargs)
except ignored_errors:
# Silently ignore errors
pass
return wrapper
def add_arg_scope(func):
"""Decorates a function with args so it can be used within an arg_scope.
Args:
func: function to decorate.
Returns:
A tuple with the decorated function func_with_args().
"""
@functools.wraps(func)
def func_with_args(*args, **kwargs):
current_scope = _current_arg_scope()
current_args = kwargs
key_func = (func.__module__, func.__name__)
if key_func in current_scope:
current_args = current_scope[key_func].copy()
current_args.update(kwargs)
return func(*args, **current_args)
_add_op(func)
return func_with_args
def locked(path, timeout=None):
"""Decorator which enables locks for decorated function.
Arguments:
- path: path for lockfile.
- timeout (optional): Timeout for acquiring lock.
Usage:
@locked('/var/run/myname', timeout=0)
def myname(...):
...
"""
def decor(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
lock = FileLock(path, timeout=timeout)
lock.acquire()
try:
return func(*args, **kwargs)
finally:
lock.release()
return wrapper
return decor
def overloaded(func):
@wraps(func)
def overloaded_func(*args, **kwargs):
for f in overloaded_func.overloads:
try:
return f(*args, **kwargs)
except TypeError:
pass
else:
# it will be nice if the error message prints a list of
# possible signatures here
raise TypeError("No compatible signatures")
def overload_with(func):
overloaded_func.overloads.append(func)
return overloaded_func
overloaded_func.overloads = [func]
overloaded_func.overload_with = overload_with
return overloaded_func
#############
def consumer(func):
"""A decorator, advances func to its first yield point when called.
Modifed this original example code from PEP 342 to use the new
functools.wraps decorator. This convenience function makes it look
like the original function, which is almost always what we want,
especially if we designed the original function to be wrapped in
the first place!
Maybe `consumer` should go into functools too!
"""
from functools import wraps
@wraps(func)
def wrapper(*args,**kw):
gen = func(*args, **kw)
gen.next()
return gen
return wrapper
def retry_with(handle_retry, exceptions, conditions, max_attempts):
def wrapper(func):
@functools.wraps(func)
def decorated(*args, **kwargs):
error = None
result = None
for attempt in range(1, max_attempts + 1):
try:
result = func(*args, **kwargs)
if any(guard(result) for guard in conditions):
handle_retry.retry(func, args, kwargs, None, attempt)
continue
return result
except Exception as err:
error = err
if any(isinstance(err, exc) for exc in exceptions):
handle_retry.retry(func, args, kwargs, err, attempt)
continue
break
logger.info('failed after {} attempts'.format(attempt))
if error is not None:
raise error
return result
return decorated
return wrapper
def restart_on_change(restart_map, stopstart=False, restart_functions=None):
"""Restart services based on configuration files changing
This function is used a decorator, for example::
@restart_on_change({
'/etc/ceph/ceph.conf': [ 'cinder-api', 'cinder-volume' ]
'/etc/apache/sites-enabled/*': [ 'apache2' ]
})
def config_changed():
pass # your code here
In this example, the cinder-api and cinder-volume services
would be restarted if /etc/ceph/ceph.conf is changed by the
ceph_client_changed function. The apache2 service would be
restarted if any file matching the pattern got changed, created
or removed. Standard wildcards are supported, see documentation
for the 'glob' module for more information.
@param restart_map: {path_file_name: [service_name, ...]
@param stopstart: DEFAULT false; whether to stop, start OR restart
@param restart_functions: nonstandard functions to use to restart services
{svc: func, ...}
@returns result from decorated function
"""
def wrap(f):
@functools.wraps(f)
def wrapped_f(*args, **kwargs):
return restart_on_change_helper(
(lambda: f(*args, **kwargs)), restart_map, stopstart,
restart_functions)
return wrapped_f
return wrap
def cached(func):
"""Cache return values for multiple executions of func + args
For example::
@cached
def unit_get(attribute):
pass
unit_get('test')
will cache the result of unit_get + 'test' for future calls.
"""
@wraps(func)
def wrapper(*args, **kwargs):
global cache
key = str((func, args, kwargs))
try:
return cache[key]
except KeyError:
pass # Drop out of the exception handler scope.
res = func(*args, **kwargs)
cache[key] = res
return res
wrapper._wrapped = func
return wrapper
def translate_exc(from_exc, to_exc):
def inner_translate_exc1(f):
@wraps(f)
def inner_translate_exc2(*args, **kwargs):
try:
return f(*args, **kwargs)
except from_exc:
raise to_exc
return inner_translate_exc2
return inner_translate_exc1
def os_requires_version(ostack_release, pkg):
"""
Decorator for hook to specify minimum supported release
"""
def wrap(f):
@wraps(f)
def wrapped_f(*args):
if os_release(pkg) < ostack_release:
raise Exception("This hook is not supported on releases"
" before %s" % ostack_release)
f(*args)
return wrapped_f
return wrap
def os_workload_status(configs, required_interfaces, charm_func=None):
"""
Decorator to set workload status based on complete contexts
"""
def wrap(f):
@wraps(f)
def wrapped_f(*args, **kwargs):
# Run the original function first
f(*args, **kwargs)
# Set workload status now that contexts have been
# acted on
set_os_workload_status(configs, required_interfaces, charm_func)
return wrapped_f
return wrap
def admin_login_required(method):
def is_admin(user):
if isinstance(user.is_admin, bool):
return user.is_admin
else:
return user.is_admin()
@functools.wraps(method)
def wrapper(*args, **kwargs):
if not current_user.is_authenticated:
flash("This section is for logged in users only.", 'warning')
return redirect(url_for('redberry.home'))
if not hasattr(current_user, 'is_admin'):
flash("Redberry expects your user instance to implement an `is_admin` boolean attribute "
"or an `is_admin()` method.", 'warning')
return redirect(url_for('redberry.home'))
if not is_admin(current_user):
flash("This section is for admin users only.", 'warning')
return redirect(url_for('redberry.home'))
return method(*args, **kwargs)
return wrapper
############
# CMS ROUTES
############
def with_context_manager(ctx_manager):
def decorator(fn):
@wraps(fn)
def context_manager_wrapper(*a, **kw):
with ctx_manager:
return fn(*a, **kw)
context_manager_wrapper.djpt_patched = True
return context_manager_wrapper
return decorator
def lazy_property(func):
attribute = '_lazy_' + func.__name__
@property
@functools.wraps(func)
def wrapper(self):
if not hasattr(self, attribute):
setattr(self, attribute, func(self))
return getattr(self, attribute)
return wrapper
# ?bias_shape ? None??????bias
def parse_args(**k):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
pv = {}
for key, val in k.iteritems():
arg_type, default_val = val
pv[key] = parse_single_arg(key, kwargs, arg_type, default_val)
kwargs.update(pv)
return func(*args, **kwargs)
return wrapper
return decorator