def auth(func):
@wraps(func)
def _decorator(request, *args, **kwargs):
auth = get_auth()
if auth.get('disable', False) is True:
return func(request, *args, **kwargs)
if 'authorized_ips' in auth:
ip = get_client_ip(request)
if is_authorized(ip, auth['authorized_ips']):
return func(request, *args, **kwargs)
prepare_credentials(auth)
if request.META.get('HTTP_AUTHORIZATION'):
authmeth, auth = request.META['HTTP_AUTHORIZATION'].split(' ')
if authmeth.lower() == 'basic':
auth = base64.b64decode(auth).decode('utf-8')
username, password = auth.split(':')
if (username == HEARTBEAT['auth']['username'] and
password == HEARTBEAT['auth']['password']):
return func(request, *args, **kwargs)
response = HttpResponse(
"Authentication failed", status=401)
response['WWW-Authenticate'] = 'Basic realm="Welcome to 1337"'
return response
return _decorator
python类wraps()的实例源码
def meta_compile(compile):
"Wraps the compile() method to do dark magic, see `meta_shellcode`."
@functools.wraps(compile)
def compile_wrapper(self, state = None):
if state is None:
state = CompilerState()
self.offset = state.offset
if hasattr(self, '_compile_hook'):
bytes = self._compile_hook(compile, state)
else:
bytes = compile(self, state)
state.next_piece( len(bytes) )
return bytes
return compile_wrapper
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
def decorator(func):
def _handle_timeout(signum, frame):
raise TimeoutError(error_message)
def wrapper(*args, **kwargs):
signal.signal(signal.SIGALRM, _handle_timeout)
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
finally:
signal.alarm(0)
return result
return wraps(func)(wrapper)
return decorator
def locked(path, timeout=None):
"""Decorator which enables locks for decorated function.
Arguments:
- path: path for lockfile.
- timeout (optional): Timeout for acquiring lock.
Usage:
@locked('/var/run/myname', timeout=0)
def myname(...):
...
"""
def decor(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
lock = FileLock(path, timeout=timeout)
lock.acquire()
try:
return func(*args, **kwargs)
finally:
lock.release()
return wrapper
return decor
def set_package(fxn):
"""Set __package__ on the returned module.
This function is deprecated.
"""
@functools.wraps(fxn)
def set_package_wrapper(*args, **kwargs):
warnings.warn('The import system now takes care of this automatically.',
DeprecationWarning, stacklevel=2)
module = fxn(*args, **kwargs)
if getattr(module, '__package__', None) is None:
module.__package__ = module.__name__
if not hasattr(module, '__path__'):
module.__package__ = module.__package__.rpartition('.')[0]
return module
return set_package_wrapper
def __getattr__(self, name):
# Attribute lookups are delegated to the underlying file
# and cached for non-numeric results
# (i.e. methods are cached, closed and friends are not)
file = self.__dict__['file']
a = getattr(file, name)
if hasattr(a, '__call__'):
func = a
@_functools.wraps(func)
def func_wrapper(*args, **kwargs):
return func(*args, **kwargs)
# Avoid closing the file as long as the wrapper is alive,
# see issue #18879.
func_wrapper._closer = self._closer
a = func_wrapper
if not isinstance(a, int):
setattr(self, name, a)
return a
# The underlying __enter__ method returns the wrong object
# (self.file) so override it to return the wrapper
def login_required(func):
@wraps(func)
def wrapper(*args, **kwargs):
logger.info ("get request, path: %s" % request.path)
token = request.form.get("token", None)
if (token == None):
logger.info ("get request without token, path: %s" % request.path)
return json.dumps({'success':'false', 'message':'user or key is null'})
result = post_to_user("/authtoken/", {'token':token})
if result.get('success') == 'true':
username = result.get('username')
beans = result.get('beans')
else:
return result
#if (cur_user == None):
# return json.dumps({'success':'false', 'message':'token failed or expired', 'Unauthorized': 'True'})
return func(username, beans, request.form, *args, **kwargs)
return wrapper
def test_wraps(self):
self.assert_ok("""\
from functools import wraps
def my_decorator(f):
dec = wraps(f)
def wrapper(*args, **kwds):
print('Calling decorated function')
return f(*args, **kwds)
wrapper = dec(wrapper)
return wrapper
@my_decorator
def example():
'''Docstring'''
return 17
assert example() == 17
""")
def locked(path, timeout=None):
"""Decorator which enables locks for decorated function.
Arguments:
- path: path for lockfile.
- timeout (optional): Timeout for acquiring lock.
Usage:
@locked('/var/run/myname', timeout=0)
def myname(...):
...
"""
def decor(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
lock = FileLock(path, timeout=timeout)
lock.acquire()
try:
return func(*args, **kwargs)
finally:
lock.release()
return wrapper
return decor
def ensure_empty_collections(*collections):
"""
Clears collections listed after function has completed.
Will throw an assertion if any collection is not empty when called.
"""
def clear(f):
@wraps(f)
def wrapper(*args, **kwargs):
db = api.common.get_conn()
collection_size = lambda name: len(list(db[name].find()))
for collection in collections:
assert collection_size(collection) == 0, "Collection was not empty: " + collection
result = f(*args, **kwargs)
return result
return wrapper
return clear
def clear_collections(*collections):
"""
Clears collections listed after function has completed.
Will throw an assertion if any collection is not empty when called.
"""
def clear(f):
@wraps(f)
def wrapper(*args, **kwargs):
db = api.common.get_conn()
try:
result = f(*args, **kwargs)
finally:
#Clear the collections.
for collection in collections:
db[collection].remove()
#Ensure they are then empty.
for collection in collections:
collection_size = lambda collection: len(list(db[collection].find()))
assert collection_size(collection) == 0, "Collection: {} was not able to be cleared.".format(collection)
return result
return wrapper
return clear
def block_before_competition(return_result):
"""
Wraps a routing function that should be blocked before the start time of the competition
"""
def decorator(f):
"""
Inner decorator
"""
@wraps(f)
def wrapper(*args, **kwds):
if datetime.utcnow().timestamp() > api.config.get_settings()["start_time"].timestamp():
return f(*args, **kwds)
else:
return return_result
return wrapper
return decorator
def _silent_connection_failure(func):
"""Decorator used to avoid raising an exception when the database timeouts
Parameters:
func: Function to decorate.
"""
@wraps(func)
def wrapper(*args, **kwargs):
"""Wraps the function to catch timeout exception.
"""
if not FLAGS.disable_mongodb_exception:
return func(*args, **kwargs)
try:
result = func(*args, **kwargs)
except pymongo.errors.ServerSelectionTimeoutError as e:
logging.error("Unable to reach the caching server: %s", e)
return None
return result
return wrapper
def array_stream(func):
"""
Decorates streaming functions to make sure that the stream
is a stream of ndarrays. Objects that are not arrays are transformed
into arrays. If the stream is in fact a single ndarray, this ndarray
is repackaged into a sequence of length 1.
The first argument of the decorated function is assumed to be an iterable of
arrays, or an iterable of objects that can be casted to arrays.
"""
@wraps(func) # thanks functools
def decorated(arrays, *args, **kwargs):
if isinstance(arrays, ndarray):
arrays = (arrays,)
return func(map(atleast_1d, arrays), *args, **kwargs)
return decorated
def call(f):
"""
Wrap a function in a processor that calls `f` on the argument before
passing it along.
Useful for creating simple arguments to the `@preprocess` decorator.
Parameters
----------
f : function
Function accepting a single argument and returning a replacement.
Usage
-----
>>> @preprocess(x=call(lambda x: x + 1))
... def foo(x):
... return x
...
>>> foo(1)
2
"""
@wraps(f)
def processor(func, argname, arg):
return f(arg)
return processor
def require_initialized(exception):
"""
Decorator for API methods that should only be called after
TradingAlgorithm.initialize. `exception` will be raised if the method is
called before initialize has completed.
Usage
-----
@require_initialized(SomeException("Don't do that!"))
def method(self):
# Do stuff that should only be allowed after initialize.
"""
def decorator(method):
@wraps(method)
def wrapped_method(self, *args, **kwargs):
if not self.initialized:
raise exception
return method(self, *args, **kwargs)
return wrapped_method
return decorator
def coerce_numbers_to_my_dtype(f):
"""
A decorator for methods whose signature is f(self, other) that coerces
``other`` to ``self.dtype``.
This is used to make comparison operations between numbers and `Factor`
instances work independently of whether the user supplies a float or
integer literal.
For example, if I write::
my_filter = my_factor > 3
my_factor probably has dtype float64, but 3 is an int, so we want to coerce
to float64 before doing the comparison.
"""
@wraps(f)
def method(self, other):
if isinstance(other, Number):
other = coerce_to_dtype(self.dtype, other)
return f(self, other)
return method
def with_algo(f):
name = f.__name__
if not name.startswith('test_'):
raise ValueError('This must decorate a test case')
tfm_name = name[len('test_'):]
@wraps(f)
def wrapper(self, data_frequency, days=None):
sim_params, source = self.sim_and_source[data_frequency]
algo = TradingAlgorithm(
initialize=initialize_with(self, tfm_name, days),
handle_data=handle_data_wrapper(f),
sim_params=sim_params,
env=self.env,
)
algo.run(source)
return wrapper
def authorized(admin_only=False):
def wrap(user_handler):
@wraps(user_handler)
def authorized_handler(self, *args, **kwargs):
self.set_cache(is_public=False)
request = self.request
if request.method == 'GET':
if not self.current_user_id:
next_url = self.get_argument('next', '/')
self.redirect(self.get_login_url() + "?next=" + next_url, status=302 if request.version == 'HTTP/1.0' else 303)
elif admin_only and not self.is_admin:
raise HTTPError(403)
else:
user_handler(self, *args, **kwargs)
elif not self.current_user_id:
raise HTTPError(403)
elif admin_only and not self.is_admin:
raise HTTPError(403)
else:
user_handler(self, *args, **kwargs)
return authorized_handler
return wrap
def memoized(func):
"""Decorator that caches a function's return value each time it is called.
If called later with the same arguments, the cached value is returned, and
the function is not re-evaluated.
Based upon from http://wiki.python.org/moin/PythonDecoratorLibrary#Memoize
Nota bene: this decorator memoizes /all/ calls to the function.
For a memoization decorator with limited cache size, consider:
http://code.activestate.com/recipes/496879-memoize-decorator-function-with-cache-size-limit/
"""
cache = {}
@wraps(func)
def func_wrapper(*args, **kwargs):
key = cPickle.dumps((args, kwargs))
if key not in cache:
cache[key] = func(*args, **kwargs)
return cache[key]
return func_wrapper
def calculate_mantl_vars(func):
"""calculate Mantl vars"""
@wraps(func)
def inner(*args, **kwargs):
name, attrs, groups = func(*args, **kwargs)
# attrs
if attrs.get('role', '') == 'control':
attrs['consul_is_server'] = True
else:
attrs['consul_is_server'] = False
# groups
if attrs.get('publicly_routable', False):
groups.append('publicly_routable')
return name, attrs, groups
return inner
def _check_inputs_and_outputs(fcn):
@functools.wraps(fcn)
def call(conn, inputs, outputs, identifier, *a, **kw):
assert isinstance(inputs, (list, tuple, set)), inputs
assert isinstance(outputs, (list, tuple, set)), outputs
assert '' not in inputs, inputs
assert '' not in outputs, outputs
# this is for actually locking inputs/outputs
inputs, outputs = list(map(str, inputs)), list(map(str, outputs))
locks = inputs + [''] + outputs
if kw.pop('history', None):
igraph = [EDGE_RE.sub('*', inp) for inp in inputs]
ograph = [EDGE_RE.sub('*', out) for out in outputs]
graph_id = EDGE_RE.sub('*', str(identifier))
graph = igraph + [''] + ograph + ['', graph_id]
if all(x.startswith('test.') for x in igraph + ograph):
graph = ['', '']
else:
graph = ['', '']
return fcn(conn, locks, graph, str(identifier), *a, **kw)
return call
def immutable(cls):
cls.__frozen = False
def frozensetattr(self, key, value):
if self.__frozen and not hasattr(self, key):
print("Class {} is frozen. Cannot set {} = {}"
.format(cls.__name__, key, value))
else:
object.__setattr__(self, key, value)
def init_decorator(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
func(self, *args, **kwargs)
self.__frozen = True
return wrapper
cls.__setattr__ = frozensetattr
cls.__init__ = init_decorator(cls.__init__)
return cls
def set_self_blocking(function):
@functools.wraps(function)
def wrapper(*args, **kwargs):
self = args[0]
try:
_is_blocking = self.gettimeout()
if _is_blocking == 0:
self.setblocking(True)
return function(*args, **kwargs)
except Exception as e:
raise
finally:
# set orgin blocking
if _is_blocking == 0:
self.setblocking(False)
return wrapper
def adapt_rgb(apply_to_rgb):
"""Return decorator that adapts to RGB images to a gray-scale filter.
This function is only intended to be used for functions that don't accept
volumes as input, since checking an image's shape is fragile.
Parameters
----------
apply_to_rgb : function
Function that returns a filtered image from an image-filter and RGB
image. This will only be called if the image is RGB-like.
"""
def decorator(image_filter):
@functools.wraps(image_filter)
def image_filter_adapted(image, *args, **kwargs):
if is_rgb_like(image):
return apply_to_rgb(image_filter, image, *args, **kwargs)
else:
return image_filter(image, *args, **kwargs)
return image_filter_adapted
return decorator
def patch(obj, func_name, before_func, after_func):
if not hasattr(obj, func_name):
return
target_func = getattr(obj, func_name)
# already patched
if hasattr(target_func, '__stackimpact_orig__'):
return
@wraps(target_func)
def wrapper(*args, **kwds):
if before_func:
before_func(*args, **kwds)
ret = target_func(*args, **kwds)
if after_func:
after_func(ret)
return ret
wrapper.__orig__ = target_func
setattr(obj, func_name, wrapper)
adapted.py 文件源码
项目:Daniel-Arbuckles-Mastering-Python
作者: PacktPublishing
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def adapted(func):
@wraps(func)
def wrapper(**kwargs):
final_args = {}
for name, value in kwargs.items():
adapt = func.__annotations__.get(name)
if adapt is not None:
final_args[name] = adapt(value)
else:
final_args[name] = value
result = func(**final_args)
adapt = func.__annotations__.get('return')
if adapt is not None:
return adapt(result)
return result
return wrapper
def admin_access():
def access(func):
@wraps(func)
def wrapped(self, bot: Bot, update: Update, *args, **kwargs):
user = update.effective_user
admins = getattr(self, 'admins', None)
if admins is None:
self.logger.warning('Specify self.admins (list of users ids) parameter in '
'manager or channel classes in order to restrict access to bot.')
return func(self, bot, update, *args, **kwargs)
if user.id not in admins:
self.logger.info(f"Unauthorized access denied for {user}.")
return
return func(self, bot, update, *args, **kwargs)
return wrapped
return access
def __call__(self, func_or_msg=None, **kwargs):
""" If the first argument is a function, return a decorator which runs
the wrapped function inside Timer's context manager.
Otherwise, treat the first argument as a 'msg' string and return an updated
Timer instance, referencing the same logger.
A 'level' keyword can also be passed to override self.level.
"""
if isinstance(func_or_msg, collections.Callable):
func = func_or_msg
# use the function name when no explicit 'msg' is provided
if not self.msg:
self.msg = "run '%s'" % func.__name__
@wraps(func)
def wrapper(*args, **kwds):
with self:
return func(*args, **kwds)
return wrapper
else:
msg = func_or_msg or kwargs.get("msg")
level = kwargs.get("level", self.level)
return self.__class__(self.logger, msg, level)
def retry(times=3):
def wrapper(func):
@wraps(func)
def fun(*args, **kwargs):
count = 0
while count < times:
try:
return func(*args, **kwargs)
except Exception as e:
count = count + 1
logger.error("connection failed after retried 3 times. {} {}".format(args, kwargs))
raise Exception("connection failed after retried 3 times. {} {}".format(args, kwargs))
return fun
return wrapper