def _params_to_urlencoded(params):
"""
Returns a application/x-www-form-urlencoded ``str`` representing the
key/value pairs in ``params``.
Keys are values are ``str()``'d before calling ``urllib.urlencode``, with
the exception of unicode objects which are utf8-encoded.
"""
def encode(o):
if isinstance(o, six.binary_type):
return o
else:
if isinstance(o, six.text_type):
return o.encode('utf-8')
else:
return str(o).encode('utf-8')
utf8_params = {encode(k): encode(v) for k, v in six.iteritems(params)}
return url_encode(utf8_params)
python类binary_type()的实例源码
def test_last_request(self, mock):
response = requests.post(GetMock.url,
headers={'custom-header': 'huseyin'},
data={'name': 'huseyin'})
self.assertEqual(response.status_code, 200)
last_request = mock.last_request
# Test if last request has expected values.
self.assertEqual(last_request.url, GetMock.url)
body = last_request.body
# In python 3 httpretty backend returns binary string for body.
# So we are decoding it back to unicode to test.
if isinstance(body, six.binary_type):
body = body.decode('utf-8')
self.assertEqual(body, 'name=huseyin')
self.assertEqual(last_request.headers.get('custom-header'),
'huseyin')
# Make sure that class's last_request is same as instances.
self.assertIs(GetMock.last_request, mock.last_request)
def csv_safe_unicode(row, ignoreErrors=True):
"""
Given an array of values, make sure all strings are unicode in Python 3 and
str in Python 2. The csv.writer in Python 2 does not handle unicode
strings and in Python 3 it does not handle byte strings.
:param row: an array which could contain mixed types.
:param ignoreErrors: if True, convert what is possible and ignore errors.
If false, conversion errors will raise an exception.
:returns: either the original array if safe, or an array with all byte
strings converted to unicode in Python 3 or str in Python 2.
"""
# This is unicode in Python 2 and bytes in Python 3
wrong_type = six.text_type if str == six.binary_type else six.binary_type
# If all of the data is not the wrong type, just return it as is. This
# allows non-string types, such as integers and floats.
if not any(isinstance(value, wrong_type) for value in row):
return row
# Convert the wrong type of string to the type needed for this version of
# Python. For Python 2 unicode gets encoded to str (bytes). For Python 3
# bytes get decoded to str (unicode).
func = 'encode' if str == six.binary_type else 'decode'
row = [getattr(value, func)('utf8', 'ignore' if ignoreErrors else 'strict')
if isinstance(value, wrong_type) else value for value in row]
return row
def _from_bytes(value):
"""Converts bytes to a string value, if necessary.
Args:
value: The string/bytes value to be converted.
Returns:
The original value converted to unicode (if bytes) or as passed in
if it started out as unicode.
Raises:
ValueError if the value could not be converted to unicode.
"""
result = (value.decode('utf-8')
if isinstance(value, six.binary_type) else value)
if isinstance(result, six.text_type):
return result
else:
raise ValueError('%r could not be converted to unicode' % (value,))
def clean_headers(headers):
"""Forces header keys and values to be strings, i.e not unicode.
The httplib module just concats the header keys and values in a way that
may make the message header a unicode string, which, if it then tries to
contatenate to a binary request body may result in a unicode decode error.
Args:
headers: dict, A dictionary of headers.
Returns:
The same dictionary but with all the keys converted to strings.
"""
clean = {}
try:
for k, v in six.iteritems(headers):
if not isinstance(k, six.binary_type):
k = str(k)
if not isinstance(v, six.binary_type):
v = str(v)
clean[_to_bytes(k)] = _to_bytes(v)
except UnicodeEncodeError:
raise NonAsciiHeaderError(k, ': ', v)
return clean
def list_cmd(image_name, project, endpoint, apikey):
"""Short version of list cmd to use with deploy cmd."""
settings = {}
if project:
settings = _get_project_settings(project, endpoint, apikey)
# Run a local docker container to run list-spiders cmd
status_code, logs = _run_list_cmd(project, image_name, settings)
if status_code != 0:
click.echo(logs)
raise shub_exceptions.ShubException(
'Container with list cmd exited with code %s' % status_code)
spiders = utils.valid_spiders(
logs.decode('utf-8') if isinstance(logs, binary_type) else logs)
return spiders
def make_header_prefix(self, message_class, version=Connection.protocol_version, stream_id=0):
if Connection.protocol_version < 3:
return six.binary_type().join(map(uint8_pack, [
0xff & (HEADER_DIRECTION_TO_CLIENT | version),
0, # flags (compression)
stream_id,
message_class.opcode # opcode
]))
else:
return six.binary_type().join(map(uint8_pack, [
0xff & (HEADER_DIRECTION_TO_CLIENT | version),
0, # flags (compression)
0, # MSB for v3+ stream
stream_id,
message_class.opcode # opcode
]))
test_libevreactor.py 文件源码
项目:deb-python-cassandra-driver
作者: openstack
项目源码
文件源码
阅读 50
收藏 0
点赞 0
评论 0
def test_partial_header_read(self, *args):
c = self.make_connection()
header = self.make_header_prefix(SupportedMessage)
options = self.make_options_body()
message = self.make_msg(header, options)
# read in the first byte
c._socket.recv.return_value = message[0:1]
c.handle_read(None, 0)
self.assertEqual(c._iobuf.getvalue(), message[0:1])
c._socket.recv.return_value = message[1:]
c.handle_read(None, 0)
self.assertEqual(six.binary_type(), c._iobuf.getvalue())
# let it write out a StartupMessage
c.handle_write(None, 0)
header = self.make_header_prefix(ReadyMessage, stream_id=1)
c._socket.recv.return_value = self.make_msg(header)
c.handle_read(None, 0)
self.assertTrue(c.connected_event.is_set())
self.assertFalse(c.is_defunct)
test_asyncorereactor.py 文件源码
项目:deb-python-cassandra-driver
作者: openstack
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_partial_header_read(self, *args):
c = self.make_connection()
header = self.make_header_prefix(SupportedMessage)
options = self.make_options_body()
message = self.make_msg(header, options)
c.socket.recv.return_value = message[0:1]
c.handle_read()
self.assertEqual(c._iobuf.getvalue(), message[0:1])
c.socket.recv.return_value = message[1:]
c.handle_read()
self.assertEqual(six.binary_type(), c._iobuf.getvalue())
# let it write out a StartupMessage
c.handle_write()
header = self.make_header_prefix(ReadyMessage, stream_id=1)
c.socket.recv.return_value = self.make_msg(header)
c.handle_read()
self.assertTrue(c.connected_event.is_set())
self.assertFalse(c.is_defunct)
def varint_pack(big):
pos = True
if big == 0:
return b'\x00'
if big < 0:
bytelength = bit_length(abs(big) - 1) // 8 + 1
big = (1 << bytelength * 8) + big
pos = False
revbytes = bytearray()
while big > 0:
revbytes.append(big & 0xff)
big >>= 8
if pos and revbytes[-1] & 0x80:
revbytes.append(0)
revbytes.reverse()
return six.binary_type(revbytes)
def _params_to_urlencoded(params):
"""
Returns a application/x-www-form-urlencoded ``str`` representing the
key/value pairs in ``params``.
Keys are values are ``str()``'d before calling ``urllib.urlencode``, with
the exception of unicode objects which are utf8-encoded.
"""
def encode(o):
if isinstance(o, six.binary_type):
return o
else:
if isinstance(o, six.text_type):
return o.encode('utf-8')
else:
return str(o).encode('utf-8')
utf8_params = {encode(k): encode(v) for k, v in six.iteritems(params)}
return url_encode(utf8_params)
def parse_schema(schema):
if type(schema) is six.binary_type or type(schema) is six.text_type:
return {
'type': schema,
}
elif type(schema) is list:
return {
'type': 'list',
}
elif type(schema) is dict:
title = schema.get(':title', None)
required_elts = list(get_required(schema.keys()))
properties = get_object_properties(schema)
return {
'type': 'object',
'title': title,
'properties': {
prop_name: parse_schema(subschema)
for prop_name, subschema in properties.items()
},
'required': required_elts,
}
else:
raise Exception('Unsupported schema definition')
def native(s):
"""
Convert :py:class:`bytes` or :py:class:`unicode` to the native
:py:class:`str` type, using UTF-8 encoding if conversion is necessary.
:raise UnicodeError: The input string is not UTF-8 decodeable.
:raise TypeError: The input is neither :py:class:`bytes` nor
:py:class:`unicode`.
"""
if not isinstance(s, (binary_type, text_type)):
raise TypeError("%r is neither bytes nor unicode" % s)
if PY3:
if isinstance(s, binary_type):
return s.decode("utf-8")
else:
if isinstance(s, text_type):
return s.encode("utf-8")
return s
def native(s):
"""
Convert :py:class:`bytes` or :py:class:`unicode` to the native
:py:class:`str` type, using UTF-8 encoding if conversion is necessary.
:raise UnicodeError: The input string is not UTF-8 decodeable.
:raise TypeError: The input is neither :py:class:`bytes` nor
:py:class:`unicode`.
"""
if not isinstance(s, (binary_type, text_type)):
raise TypeError("%r is neither bytes nor unicode" % s)
if PY3:
if isinstance(s, binary_type):
return s.decode("utf-8")
else:
if isinstance(s, text_type):
return s.encode("utf-8")
return s
def get_template_files(template_data=None, template_url=None):
if template_data:
tpl = template_data
elif template_url:
with contextlib.closing(request.urlopen(template_url)) as u:
tpl = u.read()
else:
return {}, None
if not tpl:
return {}, None
if isinstance(tpl, six.binary_type):
tpl = tpl.decode('utf-8')
template = template_format.parse(tpl)
files = {}
_get_file_contents(template, files)
return files, template
def parse(self, data):
"""
Returns the parsed tree.
"""
if isinstance(data, six.binary_type):
if data[:3] == codecs.BOM_UTF8:
encoding = 'utf-8-sig'
else:
encoding = 'latin1'
content = data.decode(encoding).strip()
else:
content = data.strip()
if not content:
return ParseResults()
return self.script.parseString(content, parseAll=True)
def convert_into_with_meta(self, item, resp):
if isinstance(item, six.string_types):
if six.PY2 and isinstance(item, six.text_type):
return UnicodeWithMeta(item, resp)
else:
return StrWithMeta(item, resp)
elif isinstance(item, six.binary_type):
return BytesWithMeta(item, resp)
elif isinstance(item, list):
return ListWithMeta(item, resp)
elif isinstance(item, tuple):
return TupleWithMeta(item, resp)
elif item is None:
return TupleWithMeta((), resp)
else:
return DictWithMeta(item, resp)
def _from_bytes(value):
"""Converts bytes to a string value, if necessary.
Args:
value: The string/bytes value to be converted.
Returns:
The original value converted to unicode (if bytes) or as passed in
if it started out as unicode.
Raises:
ValueError if the value could not be converted to unicode.
"""
result = (value.decode('utf-8')
if isinstance(value, six.binary_type) else value)
if isinstance(result, six.text_type):
return result
else:
raise ValueError(
'{0!r} could not be converted to unicode'.format(value))
def clean_headers(headers):
"""Forces header keys and values to be strings, i.e not unicode.
The httplib module just concats the header keys and values in a way that
may make the message header a unicode string, which, if it then tries to
contatenate to a binary request body may result in a unicode decode error.
Args:
headers: dict, A dictionary of headers.
Returns:
The same dictionary but with all the keys converted to strings.
"""
clean = {}
try:
for k, v in six.iteritems(headers):
if not isinstance(k, six.binary_type):
k = str(k)
if not isinstance(v, six.binary_type):
v = str(v)
clean[_helpers._to_bytes(k)] = _helpers._to_bytes(v)
except UnicodeEncodeError:
from oauth2client.client import NonAsciiHeaderError
raise NonAsciiHeaderError(k, ': ', v)
return clean
def decode_unicode(input):
"""Decode the unicode of the message, and encode it into utf-8."""
if isinstance(input, dict):
temp = {}
# If the input data is a dict, create an equivalent dict with a
# predictable insertion order to avoid inconsistencies in the
# message signature computation for equivalent payloads modulo
# ordering
for key, value in sorted(six.iteritems(input)):
temp[decode_unicode(key)] = decode_unicode(value)
return temp
elif isinstance(input, (tuple, list)):
# When doing a pair of JSON encode/decode operations to the tuple,
# the tuple would become list. So we have to generate the value as
# list here.
return [decode_unicode(element) for element in input]
elif six.PY2 and isinstance(input, six.text_type):
return input.encode('utf-8')
elif six.PY3 and isinstance(input, six.binary_type):
return input.decode('utf-8')
else:
return input
def replace_vars(data, repl_vars):
"""Return the given data structure with appropriate variables replaced.
Args:
data (Any): Data of an arbitrary type.
Returns:
Any: Data of the same time, possibly with replaced values.
"""
if isinstance(data, (six.text_type, six.binary_type)):
for k, v in repl_vars.items():
data = data.replace('${' + k + '}', v)
return data
if isinstance(data, collections.Sequence):
return type(data)([replace_vars(d, repl_vars) for d in data])
if isinstance(data, collections.Mapping):
return type(data)([(k, replace_vars(v, repl_vars))
for k, v in data.items()])
return data
def clean_markup(self, markup, parser=None):
"""Apply ``Cleaner`` to markup string or document and return a cleaned string or document."""
result_type = type(markup)
if isinstance(markup, six.string_types):
doc = fromstring(markup, parser=parser)
else:
doc = copy.deepcopy(markup)
self(doc)
if issubclass(result_type, six.binary_type):
return tostring(doc, encoding='utf-8')
elif issubclass(result_type, six.text_type):
return tostring(doc, encoding='unicode')
else:
return doc
#: A default Cleaner instance, which kills comments, processing instructions, script tags, style tags.
def __init__(self, *elements):
"""Initialize a Document manually by passing one or more Document elements (Paragraph, Heading, Table, etc.)
Strings that are passed to this constructor are automatically wrapped into Paragraph elements.
:param list[chemdataextractor.doc.element.BaseElement|string] elements: Elements in this Document.
"""
self._elements = []
for element in elements:
# Convert raw text to Paragraph elements
if isinstance(element, six.text_type):
element = Paragraph(element)
elif isinstance(element, six.binary_type):
# Try guess encoding if byte string
encoding = get_encoding(element)
log.warning('Guessed bytestring encoding as %s. Use unicode strings to avoid this warning.', encoding)
element = Paragraph(element.decode(encoding))
element.document = self
self._elements.append(element)
log.debug('%s: Initializing with %s elements' % (self.__class__.__name__, len(self.elements)))
def mixin_meta(item, resp):
if isinstance(item, six.string_types):
if six.PY2 and isinstance(item, six.text_type):
return resource.UnicodeWithMeta(item, resp)
else:
return resource.StrWithMeta(item, resp)
elif isinstance(item, six.binary_type):
return resource.BytesWithMeta(item, resp)
elif isinstance(item, list):
return resource.ListWithMeta(item, resp)
elif isinstance(item, tuple):
return resource.TupleWithMeta(item, resp)
elif item is None:
return resource.TupleWithMeta((), resp)
else:
return resource.DictWithMeta(item, resp)
def _call_audio_metadata(self, audio_meta, filepath):
if audio_meta is False: # metadata crawling is disabled
audio_meta = None
elif audio_meta is None: # no MBID is given, attempt to get
# it from id3 tag
audio_meta = self.crawl_musicbrainz_metadata(filepath)
elif isinstance(audio_meta, (six.string_types, six.binary_type)):
# MBID is given
audio_meta = self.crawl_musicbrainz_metadata(audio_meta)
elif not isinstance(audio_meta, dict):
warn_str = 'The "metadata" input can be "False" (skipped), ' \
'"basestring" (MBID input), "None" (attempt to get ' \
'the MBID from audio file tags) or "dict" (already ' \
'computed)'
warnings.warn(warn_str, stacklevel=2)
return audio_meta
def _assert_shard_manager(self, content, mode, mock_tree, mock_challenges):
with tempfile.NamedTemporaryFile(mode, delete=False) as tmp_file:
tmp_file.write(content)
tmp_file.flush()
nchallenges = 2
sm = ShardManager(
tmp_file.name,
nchallenges=nchallenges)
assert sm.filepath == tmp_file.name
assert len(sm.shards) > 0
mock_challenges.assert_called_once_with(nchallenges)
if isinstance(content, six.binary_type):
mock_tree.assert_called_once_with(
mock_challenges.return_value, content)
else:
mock_tree.assert_called_once_with(
mock_challenges.return_value, bytes(content.encode('utf-8')))
mock_tree.reset_mock()
mock_challenges.reset_mock()
def b64decode(data):
"""JOSE Base64 decode.
:param data: Base64 string to be decoded. If it's unicode, then
only ASCII characters are allowed.
:type data: `bytes` or `unicode`
:returns: Decoded data.
:rtype: bytes
:raises TypeError: if input is of incorrect type
:raises ValueError: if input is unicode with non-ASCII characters
"""
if isinstance(data, six.string_types):
try:
data = data.encode('ascii')
except UnicodeEncodeError:
raise ValueError(
'unicode argument should contain only ASCII characters')
elif not isinstance(data, six.binary_type):
raise TypeError('argument should be a str or unicode')
return base64.urlsafe_b64decode(data + b'=' * (4 - (len(data) % 4)))
def certificate(self, cert, name, alt_host=None, port=443):
"""Verifies the certificate presented at name is cert"""
if alt_host is None:
host = socket.gethostbyname(name)
elif isinstance(alt_host, six.binary_type):
host = alt_host
else:
host = alt_host.encode()
name = name if isinstance(name, six.binary_type) else name.encode()
try:
presented_cert = crypto_util.probe_sni(name, host, port)
except acme_errors.Error as error:
logger.exception(error)
return False
return presented_cert.digest("sha256") == cert.digest("sha256")
def headers_to_str_headers(headers):
'''
Converts dict or tuple-based headers of bytes or str to
tuple-based headers of str, which is the python norm (pep 3333)
'''
ret = []
if isinstance(headers, collections_abc.Mapping):
h = headers.items()
else:
h = headers
if six.PY2: #pragma: no cover
return h
for tup in h:
k, v = tup
if isinstance(k, six.binary_type):
k = k.decode('iso-8859-1')
if isinstance(v, six.binary_type):
v = v.decode('iso-8859-1')
ret.append((k, v))
return ret
def native(s):
"""
Convert :py:class:`bytes` or :py:class:`unicode` to the native
:py:class:`str` type, using UTF-8 encoding if conversion is necessary.
:raise UnicodeError: The input string is not UTF-8 decodeable.
:raise TypeError: The input is neither :py:class:`bytes` nor
:py:class:`unicode`.
"""
if not isinstance(s, (binary_type, text_type)):
raise TypeError("%r is neither bytes nor unicode" % s)
if PY3:
if isinstance(s, binary_type):
return s.decode("utf-8")
else:
if isinstance(s, text_type):
return s.encode("utf-8")
return s