def native(s):
"""
Convert :py:class:`bytes` or :py:class:`unicode` to the native
:py:class:`str` type, using UTF-8 encoding if conversion is necessary.
:raise UnicodeError: The input string is not UTF-8 decodeable.
:raise TypeError: The input is neither :py:class:`bytes` nor
:py:class:`unicode`.
"""
if not isinstance(s, (binary_type, text_type)):
raise TypeError("%r is neither bytes nor unicode" % s)
if PY3:
if isinstance(s, binary_type):
return s.decode("utf-8")
else:
if isinstance(s, text_type):
return s.encode("utf-8")
return s
python类PY3的实例源码
def _bn_to_int(self, bn):
assert bn != self._ffi.NULL
if six.PY3:
# Python 3 has constant time from_bytes, so use that.
bn_num_bytes = self._lib.BN_num_bytes(bn)
bin_ptr = self._ffi.new("unsigned char[]", bn_num_bytes)
bin_len = self._lib.BN_bn2bin(bn, bin_ptr)
# A zero length means the BN has value 0
self.openssl_assert(bin_len >= 0)
return int.from_bytes(self._ffi.buffer(bin_ptr)[:bin_len], "big")
else:
# Under Python 2 the best we can do is hex()
hex_cdata = self._lib.BN_bn2hex(bn)
self.openssl_assert(hex_cdata != self._ffi.NULL)
hex_str = self._ffi.string(hex_cdata)
self._lib.OPENSSL_free(hex_cdata)
return int(hex_str, 16)
def test_stack_output(self):
self.assertEqual(u'<pre>foo</pre>', mappings.stack_output('foo'))
self.assertEqual(u'', mappings.stack_output(None))
outputs = ['one', 'two', 'three']
# On Python 3, the pretty JSON output doesn't add space before newline
if six.PY3:
expected_text = """[\n "one",\n "two",\n "three"\n]"""
else:
expected_text = """[\n "one", \n "two", \n "three"\n]"""
self.assertEqual(u'<pre>%s</pre>' % html.escape(expected_text),
mappings.stack_output(outputs))
outputs = {'foo': 'bar'}
expected_text = """{\n "foo": "bar"\n}"""
self.assertEqual(u'<pre>%s</pre>' % html.escape(expected_text),
mappings.stack_output(outputs))
self.assertEqual(
u'<a href="http://www.example.com/foo" target="_blank">'
'http://www.example.com/foo</a>',
mappings.stack_output('http://www.example.com/foo'))
def test_admin_metadata_defs_create_namespace_invalid_json_post_raw(self):
form_data = {
'source_type': 'raw',
'direct_input': 'invalidjson'
}
res = self.client.post(reverse(constants.METADATA_CREATE_URL),
form_data)
if six.PY3:
err_msg = ('There was a problem loading the namespace: '
'Expecting value: line 1 column 1 (char 0).')
else:
err_msg = ('There was a problem loading the namespace: '
'No JSON object could be decoded.')
self.assertFormError(res, "form", None, [err_msg])
def write(self, buffer):
if self.mode == MODE_NUMBER:
for i in xrange(0, len(self.data), 3):
chars = self.data[i:i + 3]
bit_length = NUMBER_LENGTH[len(chars)]
buffer.put(int(chars), bit_length)
elif self.mode == MODE_ALPHA_NUM:
for i in xrange(0, len(self.data), 2):
chars = self.data[i:i + 2]
if len(chars) > 1:
buffer.put(
ALPHA_NUM.find(chars[0]) * 45 +
ALPHA_NUM.find(chars[1]), 11)
else:
buffer.put(ALPHA_NUM.find(chars), 6)
else:
if six.PY3:
# Iterating a bytestring in Python 3 returns an integer,
# no need to ord().
data = self.data
else:
data = [ord(c) for c in self.data]
for c in data:
buffer.put(c, 8)
def write_response_to_filepath_or_buffer(filepath_or_buffer, response):
"""
Writes the response content to the filepath or buffer.
"""
if hasattr(filepath_or_buffer, "write"):
if six.PY3 and filepath_or_buffer is sys.stdout:
# Write bytes to stdout (https://stackoverflow.com/a/23932488)
filepath_or_buffer = filepath_or_buffer.buffer
mode = getattr(filepath_or_buffer, "mode", "w")
for chunk in response.iter_content(chunk_size=1024):
if chunk:
if "b" not in mode and six.PY3:
chunk = chunk.decode("utf-8")
filepath_or_buffer.write(chunk)
if filepath_or_buffer.seekable():
filepath_or_buffer.seek(0)
else:
with open(filepath_or_buffer, "wb") as f:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
def read_content(queue):
frame = yield queue.get()
header = frame.payload
children = []
for i in range(header.weight):
content = yield read_content(queue)
children.append(content)
size = header.size
read = 0
buf = six.StringIO()
while read < size:
body = yield queue.get()
content = body.payload.content
# if this is the first instance of real binary content, convert the string buffer to BytesIO
# Not a nice fix but it preserves the original behaviour
if six.PY3 and isinstance(content, bytes) and isinstance(buf, six.StringIO):
buf = six.BytesIO()
buf.write(content)
read += len(content)
defer.returnValue(Content(buf.getvalue(), children, header.properties.copy()))
def _instance_callable(obj):
"""Given an object, return True if the object is callable.
For classes, return True if instances would be callable."""
if not isinstance(obj, ClassTypes):
# already an instance
return getattr(obj, '__call__', None) is not None
if six.PY3:
# *could* be broken by a class overriding __mro__ or __dict__ via
# a metaclass
for base in (obj,) + obj.__mro__:
if base.__dict__.get('__call__') is not None:
return True
else:
klass = obj
# uses __bases__ instead of __mro__ so that we work with old style classes
if klass.__dict__.get('__call__') is not None:
return True
for base in klass.__bases__:
if _instance_callable(base):
return True
return False
def query_params(self, value=None):
"""
Return or set a dictionary of query params
:param dict value: new dictionary of values
"""
if value is not None:
return URL._mutate(self, query=unicode_urlencode(value, doseq=True))
query = '' if self._tuple.query is None else self._tuple.query
# In Python 2.6, urlparse needs a bytestring so we encode and then
# decode the result.
if not six.PY3:
result = parse_qs(to_utf8(query), True)
return dict_to_unicode(result)
return parse_qs(query, True)
def run_with(*drivers):
"""Used to mark tests that are only applicable for certain db driver.
Skips test if driver is not available.
"""
def decorator(test):
if isinstance(test, type) and issubclass(test, TestBase):
# Decorate all test methods
for attr in dir(test):
value = getattr(test, attr)
if callable(value) and attr.startswith('test_'):
if six.PY3:
value._run_with = drivers
else:
value.__func__._run_with = drivers
else:
test._run_with = drivers
return test
return decorator
def _test_run_expirer_ttl_enabled(self, ttl_name, data_name):
content = ("[database]\n"
"%s=1\n"
"connection=log://localhost\n" % ttl_name)
if six.PY3:
content = content.encode('utf-8')
self.tempfile = fileutils.write_to_tempfile(content=content,
prefix='panko',
suffix='.conf')
subp = subprocess.Popen(['panko-expirer',
'-d',
"--config-file=%s" % self.tempfile],
stdout=subprocess.PIPE)
out, __ = subp.communicate()
self.assertEqual(0, subp.poll())
msg = "Dropping %s data with TTL 1" % data_name
if six.PY3:
msg = msg.encode('utf-8')
self.assertIn(msg, out)
def _verify_docker_image_size(self, image_name):
"""Verifies size of Docker image.
Args:
image_name: name of the Docker image.
Returns:
True if image size is withing the limits, False otherwise.
"""
shell_call(['docker', 'pull', image_name])
try:
image_size = subprocess.check_output(
['docker', 'inspect', '--format={{.Size}}', image_name]).strip()
image_size = int(image_size) if PY3 else long(image_size)
except (ValueError, subprocess.CalledProcessError) as e:
logging.error('Failed to determine docker image size: %s', e)
return False
logging.info('Size of docker image %s is %d', image_name, image_size)
if image_size > MAX_DOCKER_IMAGE_SIZE:
logging.error('Image size exceeds limit %d', MAX_DOCKER_IMAGE_SIZE)
return image_size <= MAX_DOCKER_IMAGE_SIZE
def sort(cmpfn):
if not this.Class in ('Array', 'Arguments'):
return this.to_object() # do nothing
arr = []
for i in xrange(len(this)):
arr.append(this.get(six.text_type(i)))
if not arr:
return this
if not cmpfn.is_callable():
cmpfn = None
cmp = lambda a,b: sort_compare(a, b, cmpfn)
if six.PY3:
key = functools.cmp_to_key(cmp)
arr.sort(key=key)
else:
arr.sort(cmp=cmp)
for i in xrange(len(arr)):
this.put(six.text_type(i), arr[i])
return this
def compute_signature(self, uri, params, utf=PY3):
"""Compute the signature for a given request
:param uri: full URI that Twilio requested on your server
:param params: post vars that Twilio sent with the request
:param utf: whether return should be bytestring or unicode (python3)
:returns: The computed signature
"""
s = uri
if len(params) > 0:
for k, v in sorted(params.items()):
s += k + v
# compute signature and compare signatures
mac = hmac.new(self.token, s.encode("utf-8"), sha1)
computed = base64.b64encode(mac.digest())
if utf:
computed = computed.decode('utf-8')
return computed.strip()
test_sync_client_hooks.py 文件源码
项目:opentracing-python-instrumentation
作者: uber-common
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def install_hooks(request):
urllibver = request.getfixturevalue('urllibver')
if urllibver == 'urllib2':
if six.PY3:
yield None
return
module = urllib2
else:
module = urllib.request
old_opener = module._opener
old_callee_headers = CONFIG.callee_name_headers
old_endpoint_headers = CONFIG.callee_endpoint_headers
urllib2_hooks.install_patches.__original_func()
CONFIG.callee_name_headers = ['Remote-Loc']
CONFIG.callee_endpoint_headers = ['Remote-Op']
yield module
module.install_opener(old_opener)
CONFIG.callee_name_headers = old_callee_headers
CONFIG.callee_endpoint_headers = old_endpoint_headers
def __init__(self, data):
super(TestResponse, self).__init__()
self._content_consumed = True
if isinstance(data, dict):
self.status_code = data.get('status_code', 200)
# Fake the text attribute to streamline Response creation
text = data.get('text', "")
if isinstance(text, (dict, list)):
self._content = json.dumps(text)
default_headers = {
"Content-Type": "application/json",
}
else:
self._content = text
default_headers = {}
if six.PY3 and isinstance(self._content, six.string_types):
self._content = self._content.encode('utf-8', 'strict')
self.headers = data.get('headers') or default_headers
else:
self.status_code = data
def testLongIntegers(self):
if not PY3: # There is no longs in python3
self.assertEqual(list(rrule(MINUTELY,
count=long(2),
interval=long(2),
bymonth=long(2),
byweekday=long(3),
byhour=long(6),
byminute=long(6),
bysecond=long(6),
dtstart=datetime(1997, 9, 2, 9, 0))),
[datetime(1998, 2, 5, 6, 6, 6),
datetime(1998, 2, 12, 6, 6, 6)])
self.assertEqual(list(rrule(YEARLY,
count=long(2),
bymonthday=long(5),
byweekno=long(2),
dtstart=datetime(1997, 9, 2, 9, 0))),
[datetime(1998, 1, 5, 9, 0),
datetime(2004, 1, 5, 9, 0)])
def testToStrLongIntegers(self):
if not PY3: # There is no longs in python3
self._rrulestr_reverse_test(rrule(MINUTELY,
count=long(2),
interval=long(2),
bymonth=long(2),
byweekday=long(3),
byhour=long(6),
byminute=long(6),
bysecond=long(6),
dtstart=datetime(1997, 9, 2, 9, 0)))
self._rrulestr_reverse_test(rrule(YEARLY,
count=long(2),
bymonthday=long(5),
byweekno=long(2),
dtstart=datetime(1997, 9, 2, 9, 0)))
def tzname_in_python2(namefunc):
"""Change unicode output into bytestrings in Python 2
tzname() API changed in Python 3. It used to return bytes, but was changed
to unicode strings
"""
def adjust_encoding(*args, **kwargs):
name = namefunc(*args, **kwargs)
if name is not None and not PY3:
name = name.encode()
return name
return adjust_encoding
# The following is adapted from Alexander Belopolsky's tz library
# https://github.com/abalkin/tz
def get_id(source_uuid):
"""Derive a short (12 character) id from a random UUID.
The supplied UUID must be a version 4 UUID object.
"""
if isinstance(source_uuid, six.string_types):
source_uuid = uuid.UUID(source_uuid)
if source_uuid.version != 4:
raise ValueError(_('Invalid UUID version (%d)') % source_uuid.version)
# The "time" field of a v4 UUID contains 60 random bits
# (see RFC4122, Section 4.4)
random_bytes = _to_byte_string(source_uuid.time, 60)
# The first 12 bytes (= 60 bits) of base32-encoded output is our data
encoded = base64.b32encode(six.b(random_bytes))[:12]
if six.PY3:
return encoded.lower().decode('utf-8')
else:
return encoded.lower()