def __validate(self):
if type(self._key) == six.binary_type:
if self._range_end:
self._key = KeySet(self._key, range_end=self._range_end)
else:
self._key = KeySet(self._key)
elif isinstance(self._key, KeySet):
pass
else:
raise TypeError(
'key must either be bytes or a KeySet object, not {}'.format(type(self._key)))
if self._key.type == KeySet._SINGLE:
self._range_end = None
elif self._key.type == KeySet._PREFIX:
self._range_end = _increment_last_byte(self._key.key)
elif self._key.type == KeySet._RANGE:
self._range_end = self._key.range_end
else:
raise Exception('logic error')
python类binary_type()的实例源码
def __validate(self):
if type(self._key) == six.binary_type:
self._key = KeySet(self._key)
elif isinstance(self._key, KeySet):
pass
else:
raise TypeError(
'key must either be bytes or a KeySet object, not {}'.format(type(self._key)))
if self._return_previous is not None and type(self._return_previous) != bool:
raise TypeError('return_previous must be bool, not {}'.format(
type(self._return_previous)))
if self._key.type == KeySet._SINGLE:
self._range_end = None
elif self._key.type == KeySet._PREFIX:
self._range_end = _increment_last_byte(self._key.key)
elif self._key.type == KeySet._RANGE:
self._range_end = self._key.range_end
else:
raise Exception('logic error')
def __init__(self, key, return_previous=None):
"""
:param key: The key or key set to delete.
:type key: bytes or :class:`txaioetcd.KeySet`
:param return_previous: If enabled, return the deleted key-value pairs
:type return_previous: bool or None
"""
Op.__init__(self)
if key is not None and type(key) != six.binary_type and not isinstance(key, KeySet):
raise TypeError('key must be bytes or KeySet, not {}'.format(type(key)))
if isinstance(key, KeySet):
self.key = key
else:
self.key = KeySet(key)
if return_previous is not None and type(return_previous) != bool:
raise TypeError('return_previous must be bool, not {}'.format(type(return_previous)))
self.return_previous = return_previous
def _apply(self, value):
value = super(Array, self)._apply(value) # type: Sequence
if self._has_errors:
return None
if isinstance(value, (binary_type, text_type)):
return self._invalid_value(
value = value,
reason = self.CODE_WRONG_TYPE,
template_vars = {
'incoming': self.get_type_name(type(value)),
'allowed': self.get_allowed_type_names(),
},
)
return value
def assign_attr_value(attr, val):
from mmdnn.conversion.common.IR.graph_pb2 import TensorShape
'''Assign value to AttrValue proto according to data type.'''
if isinstance(val, bool):
attr.b = val
elif isinstance(val, integer_types):
attr.i = val
elif isinstance(val, float):
attr.f = val
elif isinstance(val, binary_type) or isinstance(val, text_type):
if hasattr(val, 'encode'):
val = val.encode()
attr.s = val
elif isinstance(val, TensorShape):
attr.shape.MergeFromString(val.SerializeToString())
elif isinstance(val, list):
if not val: return
if isinstance(val[0], integer_types):
attr.list.i.extend(val)
elif isinstance(val[0], TensorShape):
attr.list.shape.extend(val)
else:
raise NotImplementedError('AttrValue cannot be of list[{}].'.format(val[0]))
else:
raise NotImplementedError('AttrValue cannot be of %s' % type(val))
def write_np_values(values, f):
"""
Arguments:
values: {str: np.array}
f: filename or filelike object
"""
with ZipFile(f, 'w') as zf:
for k, v in values.items():
# Need to do this because Python zipfile has some odd support for filenames:
# http://bugs.python.org/issue24110
if len(k) == 16 and isinstance(k, six.binary_type): # valid UUID bytes
zf.writestr(str(uuid.UUID(bytes=k)), v.tostring())
else:
zf.writestr(six.u(k), v.tostring())
zf.writestr(MANIFEST_FILENAME, json_dumps_manifest(values))
def generate_scalars_binary(scalars_map, preceding_symbols=0):
for ion_type, values in six.iteritems(scalars_map):
for native, expected in values:
native_expected = expected
has_symbols = False
if native is None:
# An un-adorned 'None' doesn't contain enough information to determine its Ion type
native_expected = b'\x0f'
elif ion_type is IonType.CLOB:
# All six.binary_type are treated as BLOBs unless wrapped by an _IonNature
tid = six.byte2int(expected) + 0x10 # increment upper nibble for clob -> blob; keep lower nibble
native_expected = bytearray([tid]) + expected[1:]
elif ion_type is IonType.SYMBOL and native is not None:
has_symbols = True
elif ion_type is IonType.STRING:
# Encode all strings as symbols too.
symbol_expected = _serialize_symbol(
IonEvent(IonEventType.SCALAR, IonType.SYMBOL, SymbolToken(None, 10 + preceding_symbols)))
yield _Parameter(IonType.SYMBOL.name + ' ' + native,
IonPyText.from_value(IonType.SYMBOL, native), symbol_expected, True)
yield _Parameter('%s %s' % (ion_type.name, native), native, native_expected, has_symbols)
wrapper = _FROM_ION_TYPE[ion_type].from_value(ion_type, native)
yield _Parameter(repr(wrapper), wrapper, expected, has_symbols)
def convert_unicode_to_str(data):
"""
Py2, always translate to utf8 from unicode
Py3, always translate to unicode
:param data:
:return:
"""
if six.PY2 and isinstance(data, six.text_type):
return data.encode('utf8')
elif six.PY3 and isinstance(data, six.binary_type):
return data.decode('utf8')
elif isinstance(data, collections.Mapping):
return dict((Util.convert_unicode_to_str(k), Util.convert_unicode_to_str(v))
for k, v in six.iteritems(data))
elif isinstance(data, collections.Iterable) and not isinstance(data, (six.binary_type, six.string_types)):
return type(data)(map(Util.convert_unicode_to_str, data))
return data
def native(s):
"""
Convert :py:class:`bytes` or :py:class:`unicode` to the native
:py:class:`str` type, using UTF-8 encoding if conversion is necessary.
:raise UnicodeError: The input string is not UTF-8 decodeable.
:raise TypeError: The input is neither :py:class:`bytes` nor
:py:class:`unicode`.
"""
if not isinstance(s, (binary_type, text_type)):
raise TypeError("%r is neither bytes nor unicode" % s)
if PY3:
if isinstance(s, binary_type):
return s.decode("utf-8")
else:
if isinstance(s, text_type):
return s.encode("utf-8")
return s
def test_get_fingerprint(self):
# Make sure _get_fingerprint() generates a consistent fingerprint
MyObject.VERSION = '1.1'
argspec = 'vulpix'
with mock.patch('inspect.getargspec') as mock_gas:
mock_gas.return_value = argspec
fp = self.ovc._get_fingerprint(MyObject.__name__)
exp_fields = sorted(list(MyObject.fields.items()))
exp_methods = sorted([('remotable_method', argspec),
('remotable_classmethod', argspec)])
expected_relevant_data = (exp_fields, exp_methods)
expected_hash = hashlib.md5(six.binary_type(repr(
expected_relevant_data).encode())).hexdigest()
expected_fp = '%s-%s' % (MyObject.VERSION, expected_hash)
self.assertEqual(expected_fp, fp, "_get_fingerprint() did not "
"generate a correct fingerprint.")
def test_get_fingerprint_with_child_versions(self):
# Make sure _get_fingerprint() generates a consistent fingerprint
# when child_versions are present
child_versions = {'1.0': '1.0', '1.1': '1.1'}
MyObject.VERSION = '1.1'
MyObject.child_versions = child_versions
argspec = 'onix'
with mock.patch('inspect.getargspec') as mock_gas:
mock_gas.return_value = argspec
fp = self.ovc._get_fingerprint(MyObject.__name__)
exp_fields = sorted(list(MyObject.fields.items()))
exp_methods = sorted([('remotable_method', argspec),
('remotable_classmethod', argspec)])
exp_child_versions = collections.OrderedDict(sorted(
child_versions.items()))
exp_relevant_data = (exp_fields, exp_methods, exp_child_versions)
expected_hash = hashlib.md5(six.binary_type(repr(
exp_relevant_data).encode())).hexdigest()
expected_fp = '%s-%s' % (MyObject.VERSION, expected_hash)
self.assertEqual(expected_fp, fp, "_get_fingerprint() did not "
"generate a correct fingerprint.")
def _decrypt(self):
decrypted_eval_data = []
for row in EvalRow.objects.all():
decrypted_row = {'pk': row.pk,
'user': row.user_identifier,
'record': row.record_identifier,
'action': row.action,
'timestamp': row.timestamp.__str__()}
gpg = gnupg.GPG()
gpg.import_keys(settings.CALLISTO_EVAL_PRIVATE_KEY)
decrypted_eval_row = six.text_type(
gpg.decrypt(six.binary_type(row.row)))
if decrypted_eval_row:
decrypted_row.update(json.loads(decrypted_eval_row))
decrypted_eval_data.append(decrypted_row)
return decrypted_eval_data
def test_20_dump_drop_restore(self):
# dump db
dump_data = self.client.services.db.dump_db(self.env.super_password,
self.env.dbname)
self.assertIsInstance(dump_data, six.binary_type)
# drop it
self.client.services.db.drop_db(self.env.super_password,
self.env.dbname)
self.assertNotIn(self.env.dbname, self.client.services.db)
# and try to restore it
self.client.services.db.restore_db(self.env.super_password,
self.env.dbname,
dump_data)
self.assertIn(self.env.dbname, self.client.services.db)
# check if objects were restored
time.sleep(2)
cl = self.client.login(self.env.dbname,
self.env.user,
self.env.password)
self.assertIsNotNone(cl.uid)
self.assertIsNotNone(cl.user)
def native(s):
"""
Convert :py:class:`bytes` or :py:class:`unicode` to the native
:py:class:`str` type, using UTF-8 encoding if conversion is necessary.
:raise UnicodeError: The input string is not UTF-8 decodeable.
:raise TypeError: The input is neither :py:class:`bytes` nor
:py:class:`unicode`.
"""
if not isinstance(s, (binary_type, text_type)):
raise TypeError("%r is neither bytes nor unicode" % s)
if PY3:
if isinstance(s, binary_type):
return s.decode("utf-8")
else:
if isinstance(s, text_type):
return s.encode("utf-8")
return s
def _from_bytes(value):
"""Converts bytes to a string value, if necessary.
Args:
value: The string/bytes value to be converted.
Returns:
The original value converted to unicode (if bytes) or as passed in
if it started out as unicode.
Raises:
ValueError if the value could not be converted to unicode.
"""
result = (value.decode('utf-8')
if isinstance(value, six.binary_type) else value)
if isinstance(result, six.text_type):
return result
else:
raise ValueError(
'{0!r} could not be converted to unicode'.format(value))
def clean_headers(headers):
"""Forces header keys and values to be strings, i.e not unicode.
The httplib module just concats the header keys and values in a way that
may make the message header a unicode string, which, if it then tries to
contatenate to a binary request body may result in a unicode decode error.
Args:
headers: dict, A dictionary of headers.
Returns:
The same dictionary but with all the keys converted to strings.
"""
clean = {}
try:
for k, v in six.iteritems(headers):
if not isinstance(k, six.binary_type):
k = str(k)
if not isinstance(v, six.binary_type):
v = str(v)
clean[_helpers._to_bytes(k)] = _helpers._to_bytes(v)
except UnicodeEncodeError:
from oauth2client.client import NonAsciiHeaderError
raise NonAsciiHeaderError(k, ': ', v)
return clean
def recv_packet(self):
try:
packet_text = self._connection.recv()
except WebSocketTimeoutException as e:
raise TimeoutError('recv timed out (%s)' % e)
except SSLError as e:
raise ConnectionError('recv disconnected by SSL (%s)' % e)
except WebSocketConnectionClosedException as e:
raise ConnectionError('recv disconnected (%s)' % e)
except SocketError as e:
raise ConnectionError('recv disconnected (%s)' % e)
if not isinstance(packet_text, six.binary_type):
packet_text = packet_text.encode('utf-8')
engineIO_packet_type, engineIO_packet_data = parse_packet_text(
packet_text)
yield engineIO_packet_type, engineIO_packet_data
def crypto_box(msg, nonce, pk, sk):
_assert_len('nonce', nonce, crypto_box_NONCEBYTES)
_assert_len('pk', pk, crypto_box_PUBLICKEYBYTES)
_assert_len('sk', sk, crypto_box_SECRETKEYBYTES)
c = bytearray(crypto_box_MACBYTES + len(msg))
_raise_on_error(
lib.crypto_box_easy(
ffi.cast("unsigned char *", ffi.from_buffer(c)),
msg,
len(msg),
nonce,
pk,
sk,
),
)
return binary_type(c)
def crypto_box_open(c, nonce, pk, sk):
_assert_min_len('c', c, crypto_box_MACBYTES)
_assert_len('nonce', nonce, crypto_box_NONCEBYTES)
_assert_len('pk', pk, crypto_box_PUBLICKEYBYTES)
_assert_len('sk', sk, crypto_box_SECRETKEYBYTES)
msg = bytearray(len(c) - crypto_box_MACBYTES)
_raise_on_error(
lib.crypto_box_open_easy(
ffi.cast("unsigned char *", ffi.from_buffer(msg)),
c,
len(c),
nonce,
pk,
sk,
),
)
return binary_type(msg)
def crypto_box_open_afternm(c, nonce, k):
_assert_min_len('c', c, crypto_box_MACBYTES)
_assert_len('nonce', nonce, crypto_box_NONCEBYTES)
_assert_len('k', k, crypto_box_BEFORENMBYTES)
msg = bytearray(len(c) - crypto_box_MACBYTES)
_raise_on_error(
lib.crypto_box_open_easy_afternm(
ffi.cast("unsigned char *", ffi.from_buffer(msg)),
c,
len(c),
nonce,
k,
),
)
return binary_type(msg)