def is_crm_leader(resource, retry=False):
"""
Returns True if the charm calling this is the elected corosync leader,
as returned by calling the external "crm" command.
We allow this operation to be retried to avoid the possibility of getting a
false negative. See LP #1396246 for more info.
"""
if resource == DC_RESOURCE_NAME:
return is_crm_dc()
cmd = ['crm', 'resource', 'show', resource]
try:
status = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
if not isinstance(status, six.text_type):
status = six.text_type(status, "utf-8")
except subprocess.CalledProcessError:
status = None
if status and get_unit_hostname() in status:
return True
if status and "resource %s is NOT running" % (resource) in status:
raise CRMResourceNotFound("CRM resource %s not found" % (resource))
return False
python类text_type()的实例源码
def bool_from_string(value):
"""Interpret string value as boolean.
Returns True if value translates to True otherwise False.
"""
if isinstance(value, six.string_types):
value = six.text_type(value)
else:
msg = "Unable to interpret non-string value '%s' as boolean" % (value)
raise ValueError(msg)
value = value.strip().lower()
if value in ['y', 'yes', 'true', 't', 'on']:
return True
elif value in ['n', 'no', 'false', 'f', 'off']:
return False
msg = "Unable to interpret string value '%s' as boolean" % (value)
raise ValueError(msg)
def is_crm_dc():
"""
Determine leadership by querying the pacemaker Designated Controller
"""
cmd = ['crm', 'status']
try:
status = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
if not isinstance(status, six.text_type):
status = six.text_type(status, "utf-8")
except subprocess.CalledProcessError as ex:
raise CRMDCNotFound(str(ex))
current_dc = ''
for line in status.split('\n'):
if line.startswith('Current DC'):
# Current DC: juju-lytrusty-machine-2 (168108163) - partition with quorum
current_dc = line.split(':')[1].split()[0]
if current_dc == get_unit_hostname():
return True
elif current_dc == 'NONE':
raise CRMDCNotFound('Current DC: NONE')
return False
json_format.py 文件源码
项目:ios-xr-grpc-python
作者: cisco-grpc-connection-libs
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def Parse(text, message, ignore_unknown_fields=False):
"""Parses a JSON representation of a protocol message into a message.
Args:
text: Message JSON representation.
message: A protocol buffer message to merge into.
ignore_unknown_fields: If True, do not raise errors for unknown fields.
Returns:
The same message passed as argument.
Raises::
ParseError: On JSON parsing problems.
"""
if not isinstance(text, six.text_type): text = text.decode('utf-8')
try:
if sys.version_info < (2, 7):
# object_pair_hook is not supported before python2.7
js = json.loads(text)
else:
js = json.loads(text, object_pairs_hook=_DuplicateChecker)
except ValueError as e:
raise ParseError('Failed to load JSON: {0}.'.format(str(e)))
return ParseDict(js, message, ignore_unknown_fields)
json_format.py 文件源码
项目:ios-xr-grpc-python
作者: cisco-grpc-connection-libs
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def _ConvertInteger(value):
"""Convert an integer.
Args:
value: A scalar value to convert.
Returns:
The integer value.
Raises:
ParseError: If an integer couldn't be consumed.
"""
if isinstance(value, float) and not value.is_integer():
raise ParseError('Couldn\'t parse integer: {0}.'.format(value))
if isinstance(value, six.text_type) and value.find(' ') != -1:
raise ParseError('Couldn\'t parse integer: "{0}".'.format(value))
return int(value)
def deploy(self, task):
"""Perform a deployment to a node."""
manager_utils.node_power_action(task, states.REBOOT)
if CONF.ansible.use_ramdisk_callback:
return states.DEPLOYWAIT
node = task.node
ip_addr = _get_node_ip_dhcp(task)
try:
self._ansible_deploy(task, ip_addr)
except Exception as e:
error = _('Deploy failed for node %(node)s: '
'Error: %(exc)s') % {'node': node.uuid,
'exc': six.text_type(e)}
LOG.exception(error)
deploy_utils.set_failed_state(task, error, collect_logs=False)
else:
self.reboot_to_instance(task)
return states.DEPLOYDONE
def test__parse_root_device_hints_fail_advanced(self):
hints = {"wwn": "s!= fake wwn",
"size": ">= 12345",
"name": "<or> spam <or> ham",
"rotational": True}
expected = {"wwn": "s!= fake%20wwn",
"name": "<or> spam <or> ham",
"size": ">= 12345"}
props = self.node.properties
props['root_device'] = hints
self.node.properties = props
self.node.save()
with task_manager.acquire(self.context, self.node.uuid) as task:
exc = self.assertRaises(
exception.InvalidParameterValue,
ansible_deploy._parse_root_device_hints, task.node)
for key, value in expected.items():
self.assertIn(six.text_type(key), six.text_type(exc))
self.assertIn(six.text_type(value), six.text_type(exc))
def _params_to_urlencoded(params):
"""
Returns a application/x-www-form-urlencoded ``str`` representing the
key/value pairs in ``params``.
Keys are values are ``str()``'d before calling ``urllib.urlencode``, with
the exception of unicode objects which are utf8-encoded.
"""
def encode(o):
if isinstance(o, six.binary_type):
return o
else:
if isinstance(o, six.text_type):
return o.encode('utf-8')
else:
return str(o).encode('utf-8')
utf8_params = {encode(k): encode(v) for k, v in six.iteritems(params)}
return url_encode(utf8_params)
def test_unregistered_event(self):
project = self.project # force creation
url = '/plugins/github/organizations/{}/webhook/'.format(
project.organization.id,
)
secret = 'b3002c3e321d4b7880360d397db2ccfd'
OrganizationOption.objects.set_value(
organization=project.organization,
key='github:webhook_secret',
value=secret,
)
response = self.client.post(
path=url,
data=PUSH_EVENT_EXAMPLE,
content_type='application/json',
HTTP_X_GITHUB_EVENT='UnregisteredEvent',
HTTP_X_HUB_SIGNATURE='sha1=98196e70369945ffa6b248cf70f7dc5e46dff241',
HTTP_X_GITHUB_DELIVERY=six.text_type(uuid4())
)
assert response.status_code == 204
def test_invalid_signature_event(self):
project = self.project # force creation
url = '/plugins/github/organizations/{}/webhook/'.format(
project.organization.id,
)
secret = '2d7565c3537847b789d6995dca8d9f84'
OrganizationOption.objects.set_value(
organization=project.organization,
key='github:webhook_secret',
value=secret,
)
response = self.client.post(
path=url,
data=PUSH_EVENT_EXAMPLE,
content_type='application/json',
HTTP_X_GITHUB_EVENT='push',
HTTP_X_HUB_SIGNATURE='sha1=33521abeaaf9a57c2abf486e0ccd54d23cf36fec',
HTTP_X_GITHUB_DELIVERY=six.text_type(uuid4())
)
assert response.status_code == 401
def test_simple(self):
url = '/plugins/github/installations/webhook/'
response = self.client.post(
path=url,
data=INSTALLATION_EVENT_EXAMPLE,
content_type='application/json',
HTTP_X_GITHUB_EVENT='installation',
HTTP_X_HUB_SIGNATURE='sha1=348e46312df2901e8cb945616ee84ce30d9987c9',
HTTP_X_GITHUB_DELIVERY=six.text_type(uuid4())
)
assert response.status_code == 204
assert Integration.objects.filter(
provider='github_apps',
external_id=2,
name='octocat',
).exists()
def validate_config(self, organization, config, actor=None):
"""
if config['foo'] and not config['bar']:
raise PluginError('You cannot configure foo with bar')
return config
```
"""
if config.get('name'):
client = self.get_client(actor)
try:
repo = client.get_repo(config['name'])
except Exception as e:
self.raise_error(e)
else:
config['external_id'] = six.text_type(repo['id'])
return config
```
def validate_config(self, organization, config, actor=None):
if config.get('url'):
client = self.get_client(actor)
# parse out the repo name and the instance
parts = urlparse(config['url'])
instance = parts.netloc
name = parts.path.rsplit('_git/', 1)[-1]
project = config.get('project') or name
try:
repo = client.get_repo(instance, name, project)
except Exception as e:
self.raise_error(e, identity=client.auth)
config.update({
'instance': instance,
'project': project,
'name': repo['name'],
'external_id': six.text_type(repo['id']),
'url': repo['_links']['web']['href'],
})
return config
def link_issue(self, request, group, form_data, **kwargs):
comment = form_data.get('comment')
if not comment:
return
_url = '%s/%s/comments' % (self.build_api_url(group, 'stories'), form_data['issue_id'])
try:
req = self.make_api_request(group.project, _url, json_data={"text": comment})
body = safe_urlread(req)
except requests.RequestException as e:
msg = six.text_type(e)
raise PluginError('Error communicating with Pivotal: %s' % (msg, ))
try:
json_resp = json.loads(body)
except ValueError as e:
msg = six.text_type(e)
raise PluginError('Error communicating with Pivotal: %s' % (msg, ))
if req.status_code > 399:
raise PluginError(json_resp['error'])
def get_event_from_url_params(self, group_id, event_id=None, slug_vars=None):
if event_id is not None:
try:
event = Event.objects.get(pk=int(event_id))
except (ValueError, Event.DoesNotExist):
return None
group = event.group
if six.text_type(group.id) != group_id:
return None
else:
try:
group = Group.objects.get(pk=int(group_id))
except (ValueError, Group.DoesNotExist):
return None
event = group.get_latest_event()
event = self._ensure_and_bind_event(event)
if event is None:
return None
if slug_vars is not None:
if slug_vars['org_slug'] != group.organization.slug or \
slug_vars['proj_slug'] != group.project.slug:
return None
return event
def get(self, request, project, *args, **kwargs):
queryset = project.hipchat_tenant_set.select_related('auth_user')
return self.respond(
[
{
'id': t.id,
'room': {
'id': t.room_id,
'name': t.room_name,
'owner': {
'id': t.room_owner_id,
'name': t.room_owner_name,
},
'homepage': t.homepage,
},
'authUser': {
'id': six.text_type(t.auth_user.id),
'name': t.auth_user.get_display_name(),
'username': t.auth_user.username,
'email': t.auth_user.email,
} if t.auth_user else None,
} for t in queryset
]
)
def raise_error(self, exc, identity=None):
if isinstance(exc, ApiUnauthorized):
six.reraise(
InvalidIdentity,
InvalidIdentity(self.message_from_error(exc), identity=identity),
sys.exc_info()[2]
)
elif isinstance(exc, ApiError):
six.reraise(
PluginError,
PluginError(self.message_from_error(exc)),
sys.exc_info()[2]
)
elif isinstance(exc, PluginError):
raise
else:
self.logger.exception(six.text_type(exc))
six.reraise(
PluginError,
PluginError(self.message_from_error(exc)),
sys.exc_info()[2]
)
def bool_from_string(value):
"""Interpret string value as boolean.
Returns True if value translates to True otherwise False.
"""
if isinstance(value, six.string_types):
value = six.text_type(value)
else:
msg = "Unable to interpret non-string value '%s' as boolean" % (value)
raise ValueError(msg)
value = value.strip().lower()
if value in ['y', 'yes', 'true', 't', 'on']:
return True
elif value in ['n', 'no', 'false', 'f', 'off']:
return False
msg = "Unable to interpret string value '%s' as boolean" % (value)
raise ValueError(msg)
def bool_from_string(value):
"""Interpret string value as boolean.
Returns True if value translates to True otherwise False.
"""
if isinstance(value, six.string_types):
value = six.text_type(value)
else:
msg = "Unable to interpret non-string value '%s' as boolean" % (value)
raise ValueError(msg)
value = value.strip().lower()
if value in ['y', 'yes', 'true', 't', 'on']:
return True
elif value in ['n', 'no', 'false', 'f', 'off']:
return False
msg = "Unable to interpret string value '%s' as boolean" % (value)
raise ValueError(msg)
def is_crm_dc():
"""
Determine leadership by querying the pacemaker Designated Controller
"""
cmd = ['crm', 'status']
try:
status = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
if not isinstance(status, six.text_type):
status = six.text_type(status, "utf-8")
except subprocess.CalledProcessError as ex:
raise CRMDCNotFound(str(ex))
current_dc = ''
for line in status.split('\n'):
if line.startswith('Current DC'):
# Current DC: juju-lytrusty-machine-2 (168108163) - partition with quorum
current_dc = line.split(':')[1].split()[0]
if current_dc == get_unit_hostname():
return True
elif current_dc == 'NONE':
raise CRMDCNotFound('Current DC: NONE')
return False
def csv_safe_unicode(row, ignoreErrors=True):
"""
Given an array of values, make sure all strings are unicode in Python 3 and
str in Python 2. The csv.writer in Python 2 does not handle unicode
strings and in Python 3 it does not handle byte strings.
:param row: an array which could contain mixed types.
:param ignoreErrors: if True, convert what is possible and ignore errors.
If false, conversion errors will raise an exception.
:returns: either the original array if safe, or an array with all byte
strings converted to unicode in Python 3 or str in Python 2.
"""
# This is unicode in Python 2 and bytes in Python 3
wrong_type = six.text_type if str == six.binary_type else six.binary_type
# If all of the data is not the wrong type, just return it as is. This
# allows non-string types, such as integers and floats.
if not any(isinstance(value, wrong_type) for value in row):
return row
# Convert the wrong type of string to the type needed for this version of
# Python. For Python 2 unicode gets encoded to str (bytes). For Python 3
# bytes get decoded to str (unicode).
func = 'encode' if str == six.binary_type else 'decode'
row = [getattr(value, func)('utf8', 'ignore' if ignoreErrors else 'strict')
if isinstance(value, wrong_type) else value for value in row]
return row
def tokenize_text(string):
"""
Tokenize input text to paragraphs, sentences and words.
Tokenization to paragraphs is done using simple Newline algorithm
For sentences and words tokenizers above are used
:param string: Text to tokenize
:type string: str or unicode
:return: text, tokenized into paragraphs, sentences and words
:rtype: list of list of list of words
"""
string = six.text_type(string)
rez = []
for part in string.split('\n'):
par = []
for sent in tokenize_sents(part):
par.append(tokenize_words(sent))
if par:
rez.append(par)
return rez
def add(self, offset, entity_type, entity_value):
"""
Adds a found entity to list. The `offset` parameter is used as key, so
only one entity can start at the given offset.
:param offset: Offset in extracted text where the found entity starts.
:param entity_type: The type of entity that is found.
:param entity_value: The found entity.
:type start: ``int``
:type entity_type: ``unicode``
:type entity_value: ``unicode``
"""
self.obj[offset] = {
'type': entity_type,
'value': entity_value,
'entity_id': re.sub(r'\s', '_', entity_value).lower()
}
def set_id(self, data):
"""
Set ID of document. To allow multiple files with the same path, the
digest of supplied data (e.g. first 4KB) is appended to the doc ID.
:param data: Data to append to docuemnt path.
:type data: Anything digestable by hashlib.
"""
digest = hashlib.md5()
digest.update(self.path.encode('utf-8'))
if isinstance(data, unicode):
data = data.encode('utf-8')
digest.update(data)
self.docid = digest.hexdigest()
def _from_bytes(value):
"""Converts bytes to a string value, if necessary.
Args:
value: The string/bytes value to be converted.
Returns:
The original value converted to unicode (if bytes) or as passed in
if it started out as unicode.
Raises:
ValueError if the value could not be converted to unicode.
"""
result = (value.decode('utf-8')
if isinstance(value, six.binary_type) else value)
if isinstance(result, six.text_type):
return result
else:
raise ValueError('%r could not be converted to unicode' % (value,))
def request(self, uri,
method='GET',
body=None,
headers=None,
redirections=1,
connection_type=None):
resp, content = self._iterable.pop(0)
if content == 'echo_request_headers':
content = headers
elif content == 'echo_request_headers_as_json':
content = json.dumps(headers)
elif content == 'echo_request_body':
if hasattr(body, 'read'):
content = body.read()
else:
content = body
elif content == 'echo_request_uri':
content = uri
if isinstance(content, six.text_type):
content = content.encode('utf-8')
return httplib2.Response(resp), content
def _build_query(self, params):
"""Builds a query string.
Args:
params: dict, the query parameters
Returns:
The query parameters properly encoded into an HTTP URI query string.
"""
if self.alt_param is not None:
params.update({'alt': self.alt_param})
astuples = []
for key, value in six.iteritems(params):
if type(value) == type([]):
for x in value:
x = x.encode('utf-8')
astuples.append((key, x))
else:
if isinstance(value, six.text_type) and callable(value.encode):
value = value.encode('utf-8')
astuples.append((key, value))
return '?' + urlencode(astuples)
def exeute(self, method, value):
str_convert_type = (
six.text_type, ipaddress.IPv4Address, ipaddress.IPv6Address)
try:
result = getattr(
self.typeclass(value, self.strict_level), method)()
if method == "validate":
result = "NOP [#f1]_"
elif isinstance(result, str_convert_type):
result = '``"{}"``'.format(result)
except TypeError:
result = "E [#f2]_"
except typepy.TypeConversionError:
result = "E [#f3]_"
return result
def prep_blob(self, blob):
"""Cleanup input."""
# remove empty lines
if type(blob) == list:
blob = [line for line in blob if line.strip() != '']
if len(blob) == 1:
blob = blob[0].replace('\\n', '\n').split('\n')
# Split by line
if type(blob) == str or type(blob) == six.text_type:
lines = blob.split('\n')
elif type(blob) == list:
if len(blob) == 1:
lines = blob[0].split('\n')
else:
lines = [line.rstrip() for line in blob]
else:
message = "Unknown input format"
log.debug("%s - '%s", message, blob)
raise ParseException(message)
return lines
def _params_to_urlencoded(params):
"""
Returns a application/x-www-form-urlencoded ``str`` representing the
key/value pairs in ``params``.
Keys are values are ``str()``'d before calling ``urllib.urlencode``, with
the exception of unicode objects which are utf8-encoded.
"""
def encode(o):
if isinstance(o, six.binary_type):
return o
else:
if isinstance(o, six.text_type):
return o.encode('utf-8')
else:
return str(o).encode('utf-8')
utf8_params = {encode(k): encode(v) for k, v in six.iteritems(params)}
return url_encode(utf8_params)