def _handle_parsing_error(func):
"""Decorator for handling errors in raw output data."""
@six.wraps(func)
def wrapper(raw_data):
msg = _('Data from Intel Node Manager %s')
try:
return func(raw_data)
except (IndexError, struct.error):
raise ironic_exception.IPMIFailure(msg % _('has wrong length.'))
except KeyError:
raise ironic_exception.IPMIFailure(msg % _('is corrupted.'))
except ValueError:
raise ironic_exception.IPMIFailure(msg % _('cannot be converted.'))
return wrapper
python类wraps()的实例源码
def _safe_operation(operation_name):
def handle_func(func):
@six.wraps(func)
def handle_args(*args, **kwargs):
instance, resource, context = args[:3]
if resource not in instance.operation_resources_map[
operation_name]:
raise exceptions.ResourceNotSupported(resource, operation_name)
retries = 1
for i in xrange(retries + 1):
try:
service = instance.resource_service_map[resource]
instance._ensure_endpoint_set(context, service)
return func(*args, **kwargs)
except exceptions.EndpointNotAvailable as e:
if i == retries:
raise
if cfg.CONF.client.auto_refresh_endpoint:
LOG.warn(e.message + ', update endpoint and try again')
instance._update_endpoint_from_keystone(context, True)
else:
raise
return handle_args
return handle_func
def __getattr__(self, item):
if self._pandas_only:
raise SkipTest("empyrical.%s expects pandas-only inputs that have "
"dt indices/labels" % item)
func = super(ConvertPandasEmpyricalProxy, self).__getattr__(item)
@wraps(func)
def convert_args(*args, **kwargs):
args = [self._convert(arg) if isinstance(arg, NDFrame) else arg
for arg in args]
kwargs = {
k: self._convert(v) if isinstance(v, NDFrame) else v
for k, v in iteritems(kwargs)
}
return func(*args, **kwargs)
return convert_args
def interprocess_locked(path):
"""Acquires & releases a interprocess lock around call into
decorated function."""
lock = InterProcessLock(path)
def decorator(f):
@six.wraps(f)
def wrapper(*args, **kwargs):
with lock:
return f(*args, **kwargs)
return wrapper
return decorator
def keras_test(func):
"""Function wrapper to clean up after TensorFlow tests.
# Arguments
func: test function to clean up after.
# Returns
A function wrapping the input function.
"""
@six.wraps(func)
def wrapper(*args, **kwargs):
output = func(*args, **kwargs)
if K.backend() == 'tensorflow':
K.clear_session()
return output
return wrapper
def __call__(self, func_or_cls):
condition = self.condition
reason = self.reason
if inspect.isfunction(func_or_cls):
@six.wraps(func_or_cls)
def wrapped(*args, **kwargs):
if condition:
raise testtools.TestCase.skipException(reason)
return func_or_cls(*args, **kwargs)
return wrapped
elif inspect.isclass(func_or_cls):
orig_func = getattr(func_or_cls, 'setUp')
@six.wraps(orig_func)
def new_func(self, *args, **kwargs):
if condition:
raise testtools.TestCase.skipException(reason)
orig_func(self, *args, **kwargs)
func_or_cls.setUp = new_func
return func_or_cls
else:
raise TypeError('skipUnless can be used only with functions or '
'classes')
def error_trap(app_name):
"""Decorator trapping any error during application boot time.
:param app_name: Application name
:type app_name: str
:return: _wrapper function
"""
@six.wraps(error_trap)
def _wrapper(func):
@six.wraps(_wrapper)
def _inner_wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception:
logger = log.getLogger(__name__)
logger.exception(
'Failed to load application: \'{}\''.format(app_name))
raise
return _inner_wrapper
return _wrapper
def keras_test(func):
"""Function wrapper to clean up after TensorFlow tests.
# Arguments
func: test function to clean up after.
# Returns
A function wrapping the input function.
"""
@six.wraps(func)
def wrapper(*args, **kwargs):
if K.backend() == 'tensorflow' or K.backend() == 'mxnet':
K.clear_session()
output = func(*args, **kwargs)
if K.backend() == 'tensorflow' or K.backend() == 'mxnet':
K.clear_session()
return output
return wrapper
def retry(exc_tuple, tries=5, delay=1, backoff=2):
def retry_dec(f):
@six.wraps(f)
def func_retry(*args, **kwargs):
_tries, _delay = tries, delay
while _tries > 1:
try:
return f(*args, **kwargs)
except exc_tuple:
time.sleep(_delay)
_tries -= 1
_delay *= backoff
LOG.debug('Retrying %(args)s, %(tries)s attempts '
'remaining...',
{'args': args, 'tries': _tries})
# NOTE(jdg): Don't log the params passed here
# some cmds like createAccount will have sensitive
# info in the params, grab only the second tuple
# which should be the Method
msg = ('Retry count exceeded for command: %s' %
args[1],)
LOG.error(msg)
raise SolidFireRequestException(message=msg)
return func_retry
return retry_dec
firmware_controller.py 文件源码
项目:deb-python-proliantutils
作者: openstack
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def check_firmware_update_component(func):
"""Checks the firmware update component."""
@six.wraps(func)
def wrapper(self, filename, component_type):
"""Wrapper around ``update_firmware`` call.
:param filename: location of the raw firmware file.
:param component_type: Type of component to be applied to.
"""
component_type = component_type and component_type.lower()
if (component_type not in SUPPORTED_FIRMWARE_UPDATE_COMPONENTS):
msg = ("Got invalid component type for firmware update: "
"``update_firmware`` is not supported on %(component)s" %
{'component': component_type})
LOG.error(self._(msg)) # noqa
raise exception.InvalidInputError(msg)
return func(self, filename, component_type)
return wrapper
def retry(*dargs, **dkw):
"""Wrap a function with a new `Retrying` object.
:param dargs: positional arguments passed to Retrying object
:param dkw: keyword arguments passed to the Retrying object
"""
# support both @retry and @retry() as valid syntax
if len(dargs) == 1 and callable(dargs[0]):
return retry()(dargs[0])
else:
def wrap(f):
if asyncio and asyncio.iscoroutinefunction(f):
r = AsyncRetrying(*dargs, **dkw)
elif tornado and tornado.gen.is_coroutine_function(f):
r = TornadoRetrying(*dargs, **dkw)
else:
r = Retrying(*dargs, **dkw)
return r.wraps(f)
return wrap
def wraps(self, f):
"""Wrap a function for retrying.
:param f: A function to wraps for retrying.
"""
@six.wraps(f)
def wrapped_f(*args, **kw):
return self.call(f, *args, **kw)
def retry_with(*args, **kwargs):
return self.copy(*args, **kwargs).wraps(f)
wrapped_f.retry = self
wrapped_f.retry_with = retry_with
return wrapped_f
def catch_errors(func):
"""Decorator which catches all errors and tries to print them."""
@six.wraps(func)
@click.pass_context
def decorator(ctx, *args, **kwargs):
try:
return func(*args, **kwargs)
except exceptions.DecapodAPIError as exc:
utils.format_output_json(ctx, exc.json, True)
except exceptions.DecapodError as exc:
click.echo(six.text_type(exc), err=True)
finally:
ctx.close()
ctx.exit(os.EX_SOFTWARE)
return decorator
def with_color(func):
"""Decorator which adds --color option if available."""
if pygments is None:
def decorator(*args, **kwargs):
kwargs["color"] = None
return func(*args, **kwargs)
else:
decorator = click.option(
"--color",
default=None,
type=click.Choice(["light", "dark"]),
help=(
"Colorize output. By default no color is used. "
"Parameter means colorscheme of the terminal")
)(func)
decorator = six.wraps(func)(decorator)
return decorator
def wrap_errors(func):
"""Decorator which logs and catches all errors of decorated function.
Also wraps all possible errors into :py:exc:`DecapodAPIError` class.
:return: Value of decorated function.
:raises decapodlib.exceptions.DecapodError: on any exception in
decorated function.
"""
@six.wraps(func)
def decorator(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as exc:
if isinstance(exc, exceptions.DecapodError):
LOG.error("Error on access to API: %s", exc)
raise
LOG.exception("Exception in decapodlib: %s", exc)
raise exceptions.DecapodAPIError(exc)
return decorator
def slugify_argument(func):
"""
Wraps a function that returns a string, adding the 'slugify' argument.
>>> slugified_fn = slugify_argument(lambda *args, **kwargs: "YOU ARE A NICE LADY")
>>> slugified_fn()
'YOU ARE A NICE LADY'
>>> slugified_fn(slugify=True)
'you-are-a-nice-lady'
"""
@six.wraps(func)
def wrapped(*args, **kwargs):
if "slugify" in kwargs and kwargs['slugify']:
return _slugify(func(*args, **kwargs))
else:
return func(*args, **kwargs)
return wrapped
def capitalize_argument(func):
"""
Wraps a function that returns a string, adding the 'capitalize' argument.
>>> capsified_fn = capitalize_argument(lambda *args, **kwargs: "what in the beeswax is this?")
>>> capsified_fn()
'what in the beeswax is this?'
>>> capsified_fn(capitalize=True)
'What In The Beeswax Is This?'
"""
@six.wraps(func)
def wrapped(*args, **kwargs):
if "capitalize" in kwargs and kwargs['capitalize']:
return func(*args, **kwargs).title()
else:
return func(*args, **kwargs)
return wrapped
def check_keyword_arguments(func):
@six.wraps(func)
def wrapper(**kw):
obj_type = kw.pop('object_type')
result = func(**kw)
extra_args = set(kw) - set(result)
if extra_args:
raise exception.InvalidParameterValue(
_("Unknown keyword arguments (%(extra)s) were passed "
"while creating a test %(object_type)s object.") %
{"extra": ", ".join(extra_args),
"object_type": obj_type})
return result
return wrapper
def _translate_excp(func):
"""Decorator to translate automaton exceptions into mogan exceptions."""
@six.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except (automaton_exceptions.InvalidState,
automaton_exceptions.NotInitialized,
automaton_exceptions.FrozenMachine,
automaton_exceptions.NotFound) as e:
raise excp.InvalidState(six.text_type(e))
except automaton_exceptions.Duplicate as e:
raise excp.DuplicateState(six.text_type(e))
return wrapper
def ks_exceptions(f):
"""Wraps keystoneclient functions and centralizes exception handling."""
@six.wraps(f)
def wrapper(*args, **kwargs):
try:
return f(*args, **kwargs)
except kaexception.EndpointNotFound:
service_type = kwargs.get('service_type', 'baremetal')
endpoint_type = kwargs.get('endpoint_type', 'internal')
raise exception.CatalogNotFound(
service_type=service_type, endpoint_type=endpoint_type)
except (kaexception.Unauthorized, kaexception.AuthorizationFailure):
raise exception.KeystoneUnauthorized()
except (kaexception.NoMatchingPlugin,
kaexception.MissingRequiredOptions) as e:
raise exception.ConfigInvalid(six.text_type(e))
except Exception as e:
LOG.exception('Keystone request failed: %(msg)s',
{'msg': six.text_type(e)})
raise exception.KeystoneFailure(six.text_type(e))
return wrapper
def authenticated(blocking=True):
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
user = None
auth = request.headers.get('Authorization')
if auth and len(auth) >= 7:
sid = auth[7:] # 'Bearer ' prefix
session = sessions.get(sid)
if session and session.get('user_id'):
user = get_user(session['user_id'])
if user:
user.sid = sid
if blocking and not user:
raise ApiException('unauthorized', status_code=401)
res = f(user=user, *args, **kwargs)
return res
return wrapper
return decorator
def keras_test(func):
"""Function wrapper to clean up after TensorFlow tests.
# Arguments
func: test function to clean up after.
# Returns
A function wrapping the input function.
"""
@six.wraps(func)
def wrapper(*args, **kwargs):
if K.backend() == 'tensorflow' or K.backend() == 'mxnet':
K.clear_session()
output = func(*args, **kwargs)
if K.backend() == 'tensorflow' or K.backend() == 'mxnet':
K.clear_session()
return output
return wrapper
def keras_test(func):
"""Function wrapper to clean up after TensorFlow tests.
# Arguments
func: test function to clean up after.
# Returns
A function wrapping the input function.
"""
@six.wraps(func)
def wrapper(*args, **kwargs):
output = func(*args, **kwargs)
if K.backend() == 'tensorflow':
K.clear_session()
return output
return wrapper
def db_transaction(function):
@six.wraps(function)
def inner(*args, **kwargs):
config = alembic_cfg.Config(
os.path.join(os.path.dirname(__file__), 'alembic.ini'))
config.set_main_option(
'script_location',
'mistral.db.sqlalchemy.migration:alembic_migrations')
# attach the Mistral conf to the Alembic conf
config.mistral_config = CONF
CONF(project='mistral')
with db_api.transaction():
return function(*args, **kwargs)
return inner
def expects_func_args(*args):
def _decorator_checker(dec):
@functools.wraps(dec)
def _decorator(f):
base_f = safe_utils.get_wrapped_function(f)
arg_names, a, kw, _default = inspect.getargspec(base_f)
if a or kw or set(args) <= set(arg_names):
# NOTE : We can't really tell if correct stuff will
# be passed if it's a function with *args or **kwargs so
# we still carry on and hope for the best
return dec(f)
else:
raise TypeError("Decorated function %(f_name)s does not "
"have the arguments expected by the "
"decorator %(d_name)s" %
{'f_name': base_f.__name__,
'd_name': dec.__name__})
return _decorator
return _decorator_checker
def spawn(func, *args, **kwargs):
"""Passthrough method for eventlet.spawn.
This utility exists so that it can be stubbed for testing without
interfering with the service spawns.
It will also grab the context from the threadlocal store and add it to
the store on the new thread. This allows for continuity in logging the
context when using this method to spawn a new thread.
"""
_context = common_context.get_current()
@functools.wraps(func)
def context_wrapper(*args, **kwargs):
# NOTE: If update_store is not called after spawn it won't be
# available for the logger to pull from threadlocal storage.
if _context is not None:
_context.update_store()
return func(*args, **kwargs)
return eventlet.spawn(context_wrapper, *args, **kwargs)
def spawn_n(func, *args, **kwargs):
"""Passthrough method for eventlet.spawn_n.
This utility exists so that it can be stubbed for testing without
interfering with the service spawns.
It will also grab the context from the threadlocal store and add it to
the store on the new thread. This allows for continuity in logging the
context when using this method to spawn a new thread.
"""
_context = common_context.get_current()
@functools.wraps(func)
def context_wrapper(*args, **kwargs):
# NOTE: If update_store is not called after spawn_n it won't be
# available for the logger to pull from threadlocal storage.
if _context is not None:
_context.update_store()
func(*args, **kwargs)
eventlet.spawn_n(context_wrapper, *args, **kwargs)
def synchronized(name, semaphores=None, blocking=False):
def wrap(f):
@six.wraps(f)
def inner(*args, **kwargs):
lock_name = 'masakari-%s' % name
int_lock = lockutils.internal_lock(lock_name,
semaphores=semaphores)
LOG.debug("Acquiring lock: %(lock_name)s on resource: "
"%(resource)s", {'lock_name': lock_name,
'resource': f.__name__})
if not int_lock.acquire(blocking=blocking):
raise exception.LockAlreadyAcquired(resource=name)
try:
return f(*args, **kwargs)
finally:
LOG.debug("Releasing lock: %(lock_name)s on resource: "
"%(resource)s", {'lock_name': lock_name,
'resource': f.__name__})
int_lock.release()
return inner
return wrap
def eat_exceptions(function):
@six.wraps(function)
def decorator(*args, **kwargs):
try:
return function(*args, **kwargs)
except HTTPError as exception:
if exception.response.status_code == 401:
error_and_quit('Your authentication information may be incorrect. Please '
'reconfigure with ``dbfs configure``')
else:
error_and_quit(exception.response.content)
except Exception as exception: # noqa
if not DEBUG_MODE:
error_and_quit('{}: {}'.format(type(exception).__name__, str(exception)))
decorator.__doc__ = function.__doc__
return decorator
def run_once(function, state={}, errors={}):
"""A memoization decorator, whose purpose is to cache calls."""
@six.wraps(function)
def _wrapper(*args, **kwargs):
if function in errors:
# Deliberate use of LBYL.
six.reraise(*errors[function])
try:
return state[function]
except KeyError:
try:
state[function] = result = function(*args, **kwargs)
return result
except Exception:
errors[function] = sys.exc_info()
raise
return _wrapper