def get_mon_map(service):
"""
Returns the current monitor map.
:param service: six.string_types. The Ceph user name to run the command under
:return: json string. :raise: ValueError if the monmap fails to parse.
Also raises CalledProcessError if our ceph command fails
"""
try:
mon_status = check_output(['ceph', '--id', service,
'mon_status', '--format=json'])
if six.PY3:
mon_status = mon_status.decode('UTF-8')
try:
return json.loads(mon_status)
except ValueError as v:
log("Unable to parse mon_status json: {}. Error: {}"
.format(mon_status, str(v)))
raise
except CalledProcessError as e:
log("mon_status command failed with message: {}"
.format(str(e)))
raise
python类string_types()的实例源码
def hash_monitor_names(service):
"""
Uses the get_mon_map() function to get information about the monitor
cluster.
Hash the name of each monitor. Return a sorted list of monitor hashes
in an ascending order.
:param service: six.string_types. The Ceph user name to run the command under
:rtype : dict. json dict of monitor name, ip address and rank
example: {
'name': 'ip-172-31-13-165',
'rank': 0,
'addr': '172.31.13.165:6789/0'}
"""
try:
hash_list = []
monitor_list = get_mon_map(service=service)
if monitor_list['monmap']['mons']:
for mon in monitor_list['monmap']['mons']:
hash_list.append(
hashlib.sha224(mon['name'].encode('utf-8')).hexdigest())
return sorted(hash_list)
else:
return None
except (ValueError, CalledProcessError):
raise
def monitor_key_set(service, key, value):
"""
Sets a key value pair on the monitor cluster.
:param service: six.string_types. The Ceph user name to run the command under
:param key: six.string_types. The key to set.
:param value: The value to set. This will be converted to a string
before setting
"""
try:
check_output(
['ceph', '--id', service,
'config-key', 'put', str(key), str(value)])
except CalledProcessError as e:
log("Monitor config-key put failed with message: {}".format(
e.output))
raise
def monitor_key_get(service, key):
"""
Gets the value of an existing key in the monitor cluster.
:param service: six.string_types. The Ceph user name to run the command under
:param key: six.string_types. The key to search for.
:return: Returns the value of that key or None if not found.
"""
try:
output = check_output(
['ceph', '--id', service,
'config-key', 'get', str(key)]).decode('UTF-8')
return output
except CalledProcessError as e:
log("Monitor config-key get failed with message: {}".format(
e.output))
return None
def remove_pool_snapshot(service, pool_name, snapshot_name):
"""
Remove a snapshot from a RADOS pool in ceph.
:param service: six.string_types. The Ceph user name to run the command under
:param pool_name: six.string_types
:param snapshot_name: six.string_types
:return: None. Can raise CalledProcessError
"""
cmd = ['ceph', '--id', service, 'osd', 'pool', 'rmsnap', pool_name, snapshot_name]
try:
check_call(cmd)
except CalledProcessError:
raise
# max_bytes should be an int or long
def get_cache_mode(service, pool_name):
"""
Find the current caching mode of the pool_name given.
:param service: six.string_types. The Ceph user name to run the command under
:param pool_name: six.string_types
:return: int or None
"""
validator(value=service, valid_type=six.string_types)
validator(value=pool_name, valid_type=six.string_types)
out = check_output(['ceph', '--id', service,
'osd', 'dump', '--format=json'])
if six.PY3:
out = out.decode('UTF-8')
try:
osd_json = json.loads(out)
for pool in osd_json['pools']:
if pool['pool_name'] == pool_name:
return pool['cache_mode']
return None
except ValueError:
raise
def add_roles(self, user, roles):
"""
Adds one or more roles to a user. Adding the
same role more than once has no effect.
user -- name of the user
roles -- one or more roles to add
"""
if user not in self.users:
raise UserNotDefined(user)
roles = (roles,) if isinstance(roles, six.string_types) else roles
for role in roles:
if ',' in role:
raise BadRoleError('\',\' not allowed in role name (%s) '
'for user %s' % (role, user))
self.roles[user].add(role)
def ng_call_scope_function(self, element, func, params='', return_out=False):
"""
:Description: Will execute scope function with provided parameters.
:Warning: This will only work for angular.js 1.x.
:Warning: Requires angular debugging to be enabled.
:param element: Element for browser instance to target.
:param func: Function to execute from angular element scope.
:type func: string
:param params: String (naked) args, or list of parameters to pass to target function.
:type params: string, tuple, list
:param return_out: Return output of function call otherwise None
:type return_out: bool
"""
if isinstance(params, string_types):
param_str = params
elif isinstance(params, (tuple, list)):
param_str = self.__serialize_params(params)
else:
raise ValueError('Invalid type specified for function parameters')
exec_str = 'angular.element(arguments[0]).scope().%s(%s);' % (func, param_str)
if return_out:
return self.__type2python(
self.browser.execute_script('return {}'.format(exec_str), element))
else:
self.browser.execute_script(exec_str, element)
def ng_call_ctrl_function(self, element, func, params='', return_out=False):
"""
:Description: Will execute controller function with provided parameters.
:Warning: This will only work for angular.js 1.x.
:Warning: Requires angular debugging to be enabled.
:param element: Element for browser instance to target.
:param func: Function to execute from angular element controller.
:type func: string
:param params: String (naked) args, or list of parameters to pass to target function.
:type params: string, tuple, list
:param return_out: Return output of function call otherwise None
:type return_out: bool
"""
if isinstance(params, string_types):
param_str = params
elif isinstance(params, (tuple, list)):
param_str = self.__serialize_params(params)
else:
raise ValueError('Invalid type specified for function parameters')
exec_str = 'angular.element(arguments[0]).controller().%s(%s);' % (func, param_str)
if return_out:
return self.__type2python(
self.browser.execute_script('return {}'.format(exec_str), element))
else:
self.browser.execute_script(exec_str, element)
def ng2_call_component_function(self, element, func, params='', return_out=False):
"""
:Description: Will execute the component instance function with provided parameters.
:Warning: This will only work for Angular components.
:param element: Element for browser instance to target.
:param func: Function to execute from component instance.
:type func: string
:param params: String (naked) args, or list of parameters to pass to target function.
:type params: string, tuple, list
:param return_out: Return output of function call otherwise None
:type return_out: bool
"""
if isinstance(params, string_types):
param_str = params
elif isinstance(params, (tuple, list)):
param_str = self.__serialize_params(params)
else:
raise ValueError('Invalid type specified for function parameters')
exec_str = 'ng.probe(arguments[0]).componentInstance.%s(%s);' % (func, param_str)
if return_out:
return self.__type2python(
self.browser.execute_script('return {}'.format(exec_str), element))
else:
self.browser.execute_script(exec_str, element)
def ensure_timezone(func, argname, arg):
"""Argument preprocessor that converts the input into a tzinfo object.
Usage
-----
>>> from zipline.utils.preprocess import preprocess
>>> @preprocess(tz=ensure_timezone)
... def foo(tz):
... return tz
>>> foo('utc')
<UTC>
"""
if isinstance(arg, tzinfo):
return arg
if isinstance(arg, string_types):
return timezone(arg)
raise TypeError(
"{func}() couldn't convert argument "
"{argname}={arg!r} to a timezone.".format(
func=_qualified_name(func),
argname=argname,
arg=arg,
),
)
def user_login(self):
self.log.info('Google User Login for: {}'.format(self.username))
if not isinstance(self.username, six.string_types) or not isinstance(self.password, six.string_types):
raise AuthException("Username/password not correctly specified")
user_login = perform_master_login(self.username, self.password, self.GOOGLE_LOGIN_ANDROID_ID)
try:
refresh_token = user_login.get('Token', None)
except ConnectionError as e:
raise AuthException("Caught ConnectionError: %s", e)
if refresh_token is not None:
self._refresh_token = refresh_token
self.log.info('Google User Login successful.')
else:
self._refresh_token = None
raise AuthException("Invalid Google Username/password")
self.get_access_token()
return self._login
def scopes_to_string(scopes):
"""Converts scope value to a string.
If scopes is a string then it is simply passed through. If scopes is an
iterable then a string is returned that is all the individual scopes
concatenated with spaces.
Args:
scopes: string or iterable of strings, the scopes.
Returns:
The scopes formatted as a single string.
"""
if isinstance(scopes, six.string_types):
return scopes
else:
return ' '.join(scopes)
def string_to_scopes(scopes):
"""Converts stringifed scope value to a list.
If scopes is a list then it is simply passed through. If scopes is an
string then a list of each individual scope is returned.
Args:
scopes: a string or iterable of strings, the scopes.
Returns:
The scopes in a list.
"""
if not scopes:
return []
if isinstance(scopes, six.string_types):
return scopes.split(' ')
else:
return scopes
def __init__(self, value):
"""
Initializer value can be:
- integer_type: absolute days from epoch (1970, 1, 1). Can be negative.
- datetime.date: built-in date
- string_type: a string time of the form "yyyy-mm-dd"
"""
if isinstance(value, six.integer_types):
self.days_from_epoch = value
elif isinstance(value, (datetime.date, datetime.datetime)):
self._from_timetuple(value.timetuple())
elif isinstance(value, six.string_types):
self._from_datestring(value)
else:
raise TypeError('Date arguments must be a whole number, datetime.date, or string')
def generic_type_name(v):
"""Return a descriptive type name that isn't Python specific. For example,
an int value will return 'integer' rather than 'int'."""
if isinstance(v, numbers.Integral):
# Must come before real numbers check since integrals are reals too
return 'integer'
elif isinstance(v, numbers.Real):
return 'float'
elif isinstance(v, (tuple, list)):
return 'list'
elif isinstance(v, six.string_types):
return 'string'
elif v is None:
return 'null'
else:
return type(v).__name__
def process_response(self, response):
import six
body = response['body']['string']
# backward compatibility. under 2.7 response body is unicode, under 3.5 response body is
# bytes. when set the value back, the same type must be used.
body_is_string = isinstance(body, six.string_types)
content_in_string = (response['body']['string'] or b'').decode('utf-8')
index = content_in_string.find(LargeResponseBodyProcessor.control_flag)
if index > -1:
length = int(content_in_string[index + len(LargeResponseBodyProcessor.control_flag):])
if body_is_string:
response['body']['string'] = '0' * length
else:
response['body']['string'] = bytes([0] * length)
return response
def _process_response_recording(self, response):
if self.in_recording:
# make header name lower case and filter unwanted headers
headers = {}
for key in response['headers']:
if key.lower() not in self.FILTER_HEADERS:
headers[key.lower()] = response['headers'][key]
response['headers'] = headers
body = response['body']['string']
if body and not isinstance(body, six.string_types):
response['body']['string'] = body.decode('utf-8')
for processor in self.recording_processors:
response = processor.process_response(response)
if not response:
break
else:
for processor in self.replay_processors:
response = processor.process_response(response)
if not response:
break
return response
def __init__(self, fileobj):
global rrule
if not rrule:
from dateutil import rrule
if isinstance(fileobj, string_types):
self._s = fileobj
# ical should be encoded in UTF-8 with CRLF
fileobj = open(fileobj, 'r')
elif hasattr(fileobj, "name"):
self._s = fileobj.name
else:
self._s = repr(fileobj)
self._vtz = {}
self._parse_rfc(fileobj.read())
def extract_options(inc, global_options=None):
global_options = global_options or []
if global_options and isinstance(global_options, six.string_types):
global_options = [global_options]
if '|' not in inc:
return (inc, global_options)
inc, opts = inc.split('|')
return (inc, parse_sync_options(opts) + global_options)