python类binary_type()的实例源码

_client_commons.py 文件源码 项目:txaio-etcd 作者: crossbario 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __validate(self):
        if type(self._key) == six.binary_type:
            if self._range_end:
                self._key = KeySet(self._key, range_end=self._range_end)
            else:
                self._key = KeySet(self._key)
        elif isinstance(self._key, KeySet):
            pass
        else:
            raise TypeError(
                'key must either be bytes or a KeySet object, not {}'.format(type(self._key)))

        if self._key.type == KeySet._SINGLE:
            self._range_end = None
        elif self._key.type == KeySet._PREFIX:
            self._range_end = _increment_last_byte(self._key.key)
        elif self._key.type == KeySet._RANGE:
            self._range_end = self._key.range_end
        else:
            raise Exception('logic error')
_client_commons.py 文件源码 项目:txaio-etcd 作者: crossbario 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __validate(self):
        if type(self._key) == six.binary_type:
            self._key = KeySet(self._key)
        elif isinstance(self._key, KeySet):
            pass
        else:
            raise TypeError(
                'key must either be bytes or a KeySet object, not {}'.format(type(self._key)))

        if self._return_previous is not None and type(self._return_previous) != bool:
            raise TypeError('return_previous must be bool, not {}'.format(
                type(self._return_previous)))

        if self._key.type == KeySet._SINGLE:
            self._range_end = None
        elif self._key.type == KeySet._PREFIX:
            self._range_end = _increment_last_byte(self._key.key)
        elif self._key.type == KeySet._RANGE:
            self._range_end = self._key.range_end
        else:
            raise Exception('logic error')
_types.py 文件源码 项目:txaio-etcd 作者: crossbario 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, key, return_previous=None):
        """

        :param key: The key or key set to delete.
        :type key: bytes or :class:`txaioetcd.KeySet`

        :param return_previous: If enabled, return the deleted key-value pairs
        :type return_previous: bool or None
        """
        Op.__init__(self)

        if key is not None and type(key) != six.binary_type and not isinstance(key, KeySet):
            raise TypeError('key must be bytes or KeySet, not {}'.format(type(key)))

        if isinstance(key, KeySet):
            self.key = key
        else:
            self.key = KeySet(key)

        if return_previous is not None and type(return_previous) != bool:
            raise TypeError('return_previous must be bool, not {}'.format(type(return_previous)))

        self.return_previous = return_previous
simple.py 文件源码 项目:filters 作者: eflglobal 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _apply(self, value):
        value = super(Array, self)._apply(value) # type: Sequence

        if self._has_errors:
            return None

        if isinstance(value, (binary_type, text_type)):
            return self._invalid_value(
                value   = value,
                reason  = self.CODE_WRONG_TYPE,

                template_vars = {
                    'incoming': self.get_type_name(type(value)),
                    'allowed':  self.get_allowed_type_names(),
                },
            )

        return value
utils.py 文件源码 项目:MMdnn 作者: Microsoft 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def assign_attr_value(attr, val):
    from mmdnn.conversion.common.IR.graph_pb2 import TensorShape
    '''Assign value to AttrValue proto according to data type.'''
    if isinstance(val, bool):
        attr.b = val
    elif isinstance(val, integer_types):
        attr.i = val
    elif isinstance(val, float):
        attr.f = val
    elif isinstance(val, binary_type) or isinstance(val, text_type):
        if hasattr(val, 'encode'):
            val = val.encode()
        attr.s = val
    elif isinstance(val, TensorShape):
        attr.shape.MergeFromString(val.SerializeToString())
    elif isinstance(val, list):
        if not val: return
        if isinstance(val[0], integer_types):
            attr.list.i.extend(val)
        elif isinstance(val[0], TensorShape):
            attr.list.shape.extend(val)
        else:
            raise NotImplementedError('AttrValue cannot be of list[{}].'.format(val[0]))
    else:
        raise NotImplementedError('AttrValue cannot be of %s' % type(val))
serde_weights.py 文件源码 项目:ngraph 作者: NervanaSystems 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def write_np_values(values, f):
    """
    Arguments:
        values: {str: np.array}
        f: filename or filelike object
    """
    with ZipFile(f, 'w') as zf:
        for k, v in values.items():
            # Need to do this because Python zipfile has some odd support for filenames:
            # http://bugs.python.org/issue24110
            if len(k) == 16 and isinstance(k, six.binary_type):  # valid UUID bytes
                zf.writestr(str(uuid.UUID(bytes=k)), v.tostring())
            else:
                zf.writestr(six.u(k), v.tostring())

        zf.writestr(MANIFEST_FILENAME, json_dumps_manifest(values))
test_simpleion.py 文件源码 项目:ion-python 作者: amzn 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def generate_scalars_binary(scalars_map, preceding_symbols=0):
    for ion_type, values in six.iteritems(scalars_map):
        for native, expected in values:
            native_expected = expected
            has_symbols = False
            if native is None:
                # An un-adorned 'None' doesn't contain enough information to determine its Ion type
                native_expected = b'\x0f'
            elif ion_type is IonType.CLOB:
                # All six.binary_type are treated as BLOBs unless wrapped by an _IonNature
                tid = six.byte2int(expected) + 0x10  # increment upper nibble for clob -> blob; keep lower nibble
                native_expected = bytearray([tid]) + expected[1:]
            elif ion_type is IonType.SYMBOL and native is not None:
                has_symbols = True
            elif ion_type is IonType.STRING:
                # Encode all strings as symbols too.
                symbol_expected = _serialize_symbol(
                    IonEvent(IonEventType.SCALAR, IonType.SYMBOL, SymbolToken(None, 10 + preceding_symbols)))
                yield _Parameter(IonType.SYMBOL.name + ' ' + native,
                                 IonPyText.from_value(IonType.SYMBOL, native), symbol_expected, True)
            yield _Parameter('%s %s' % (ion_type.name, native), native, native_expected, has_symbols)
            wrapper = _FROM_ION_TYPE[ion_type].from_value(ion_type, native)
            yield _Parameter(repr(wrapper), wrapper, expected, has_symbols)
util.py 文件源码 项目:aliyun-log-python-sdk 作者: aliyun 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def convert_unicode_to_str(data):
        """
        Py2, always translate to utf8 from unicode
        Py3, always translate to unicode
        :param data:
        :return:
        """
        if six.PY2 and isinstance(data, six.text_type):
            return data.encode('utf8')
        elif six.PY3 and isinstance(data, six.binary_type):
            return data.decode('utf8')
        elif isinstance(data, collections.Mapping):
            return dict((Util.convert_unicode_to_str(k), Util.convert_unicode_to_str(v))
                        for k, v in six.iteritems(data))
        elif isinstance(data, collections.Iterable) and not isinstance(data, (six.binary_type, six.string_types)):
            return type(data)(map(Util.convert_unicode_to_str, data))

        return data
_util.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def native(s):
    """
    Convert :py:class:`bytes` or :py:class:`unicode` to the native
    :py:class:`str` type, using UTF-8 encoding if conversion is necessary.

    :raise UnicodeError: The input string is not UTF-8 decodeable.

    :raise TypeError: The input is neither :py:class:`bytes` nor
        :py:class:`unicode`.
    """
    if not isinstance(s, (binary_type, text_type)):
        raise TypeError("%r is neither bytes nor unicode" % s)
    if PY3:
        if isinstance(s, binary_type):
            return s.decode("utf-8")
    else:
        if isinstance(s, text_type):
            return s.encode("utf-8")
    return s
test_fixture.py 文件源码 项目:deb-oslo.versionedobjects 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_get_fingerprint(self):
        # Make sure _get_fingerprint() generates a consistent fingerprint
        MyObject.VERSION = '1.1'
        argspec = 'vulpix'

        with mock.patch('inspect.getargspec') as mock_gas:
            mock_gas.return_value = argspec
            fp = self.ovc._get_fingerprint(MyObject.__name__)

        exp_fields = sorted(list(MyObject.fields.items()))
        exp_methods = sorted([('remotable_method', argspec),
                              ('remotable_classmethod', argspec)])
        expected_relevant_data = (exp_fields, exp_methods)
        expected_hash = hashlib.md5(six.binary_type(repr(
            expected_relevant_data).encode())).hexdigest()
        expected_fp = '%s-%s' % (MyObject.VERSION, expected_hash)

        self.assertEqual(expected_fp, fp, "_get_fingerprint() did not "
                                          "generate a correct fingerprint.")
test_fixture.py 文件源码 项目:deb-oslo.versionedobjects 作者: openstack 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_get_fingerprint_with_child_versions(self):
        # Make sure _get_fingerprint() generates a consistent fingerprint
        # when child_versions are present
        child_versions = {'1.0': '1.0', '1.1': '1.1'}
        MyObject.VERSION = '1.1'
        MyObject.child_versions = child_versions
        argspec = 'onix'

        with mock.patch('inspect.getargspec') as mock_gas:
            mock_gas.return_value = argspec
            fp = self.ovc._get_fingerprint(MyObject.__name__)

        exp_fields = sorted(list(MyObject.fields.items()))
        exp_methods = sorted([('remotable_method', argspec),
                              ('remotable_classmethod', argspec)])
        exp_child_versions = collections.OrderedDict(sorted(
            child_versions.items()))
        exp_relevant_data = (exp_fields, exp_methods, exp_child_versions)

        expected_hash = hashlib.md5(six.binary_type(repr(
            exp_relevant_data).encode())).hexdigest()
        expected_fp = '%s-%s' % (MyObject.VERSION, expected_hash)

        self.assertEqual(expected_fp, fp, "_get_fingerprint() did not "
                                          "generate a correct fingerprint.")
decrypt_eval_data.py 文件源码 项目:callisto-core 作者: project-callisto 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _decrypt(self):
        decrypted_eval_data = []
        for row in EvalRow.objects.all():
            decrypted_row = {'pk': row.pk,
                             'user': row.user_identifier,
                             'record': row.record_identifier,
                             'action': row.action,
                             'timestamp': row.timestamp.__str__()}
            gpg = gnupg.GPG()
            gpg.import_keys(settings.CALLISTO_EVAL_PRIVATE_KEY)
            decrypted_eval_row = six.text_type(
                gpg.decrypt(six.binary_type(row.row)))
            if decrypted_eval_row:
                decrypted_row.update(json.loads(decrypted_eval_row))
            decrypted_eval_data.append(decrypted_row)
        return decrypted_eval_data
test_db.py 文件源码 项目:odoo-rpc-client 作者: katyukha 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_20_dump_drop_restore(self):
        # dump db
        dump_data = self.client.services.db.dump_db(self.env.super_password,
                                                    self.env.dbname)
        self.assertIsInstance(dump_data, six.binary_type)

        # drop it
        self.client.services.db.drop_db(self.env.super_password,
                                        self.env.dbname)
        self.assertNotIn(self.env.dbname, self.client.services.db)

        # and try to restore it
        self.client.services.db.restore_db(self.env.super_password,
                                           self.env.dbname,
                                           dump_data)
        self.assertIn(self.env.dbname, self.client.services.db)

        # check if objects were restored
        time.sleep(2)
        cl = self.client.login(self.env.dbname,
                               self.env.user,
                               self.env.password)
        self.assertIsNotNone(cl.uid)
        self.assertIsNotNone(cl.user)
_util.py 文件源码 项目:MKFQ 作者: maojingios 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def native(s):
    """
    Convert :py:class:`bytes` or :py:class:`unicode` to the native
    :py:class:`str` type, using UTF-8 encoding if conversion is necessary.

    :raise UnicodeError: The input string is not UTF-8 decodeable.

    :raise TypeError: The input is neither :py:class:`bytes` nor
        :py:class:`unicode`.
    """
    if not isinstance(s, (binary_type, text_type)):
        raise TypeError("%r is neither bytes nor unicode" % s)
    if PY3:
        if isinstance(s, binary_type):
            return s.decode("utf-8")
    else:
        if isinstance(s, text_type):
            return s.encode("utf-8")
    return s
_helpers.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _from_bytes(value):
    """Converts bytes to a string value, if necessary.

    Args:
        value: The string/bytes value to be converted.

    Returns:
        The original value converted to unicode (if bytes) or as passed in
        if it started out as unicode.

    Raises:
        ValueError if the value could not be converted to unicode.
    """
    result = (value.decode('utf-8')
              if isinstance(value, six.binary_type) else value)
    if isinstance(result, six.text_type):
        return result
    else:
        raise ValueError(
            '{0!r} could not be converted to unicode'.format(value))
transport.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def clean_headers(headers):
    """Forces header keys and values to be strings, i.e not unicode.

    The httplib module just concats the header keys and values in a way that
    may make the message header a unicode string, which, if it then tries to
    contatenate to a binary request body may result in a unicode decode error.

    Args:
        headers: dict, A dictionary of headers.

    Returns:
        The same dictionary but with all the keys converted to strings.
    """
    clean = {}
    try:
        for k, v in six.iteritems(headers):
            if not isinstance(k, six.binary_type):
                k = str(k)
            if not isinstance(v, six.binary_type):
                v = str(v)
            clean[_helpers._to_bytes(k)] = _helpers._to_bytes(v)
    except UnicodeEncodeError:
        from oauth2client.client import NonAsciiHeaderError
        raise NonAsciiHeaderError(k, ': ', v)
    return clean
transports.py 文件源码 项目:btlejuice-python-bindings 作者: DigitalSecurity 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def recv_packet(self):
        try:
            packet_text = self._connection.recv()
        except WebSocketTimeoutException as e:
            raise TimeoutError('recv timed out (%s)' % e)
        except SSLError as e:
            raise ConnectionError('recv disconnected by SSL (%s)' % e)
        except WebSocketConnectionClosedException as e:
            raise ConnectionError('recv disconnected (%s)' % e)
        except SocketError as e:
            raise ConnectionError('recv disconnected (%s)' % e)
        if not isinstance(packet_text, six.binary_type):
            packet_text = packet_text.encode('utf-8')
        engineIO_packet_type, engineIO_packet_data = parse_packet_text(
            packet_text)
        yield engineIO_packet_type, engineIO_packet_data
__init__.py 文件源码 项目:csodium 作者: ereOn 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def crypto_box(msg, nonce, pk, sk):
    _assert_len('nonce', nonce, crypto_box_NONCEBYTES)
    _assert_len('pk', pk, crypto_box_PUBLICKEYBYTES)
    _assert_len('sk', sk, crypto_box_SECRETKEYBYTES)

    c = bytearray(crypto_box_MACBYTES + len(msg))
    _raise_on_error(
        lib.crypto_box_easy(
            ffi.cast("unsigned char *", ffi.from_buffer(c)),
            msg,
            len(msg),
            nonce,
            pk,
            sk,
        ),
    )

    return binary_type(c)
__init__.py 文件源码 项目:csodium 作者: ereOn 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def crypto_box_open(c, nonce, pk, sk):
    _assert_min_len('c', c, crypto_box_MACBYTES)
    _assert_len('nonce', nonce, crypto_box_NONCEBYTES)
    _assert_len('pk', pk, crypto_box_PUBLICKEYBYTES)
    _assert_len('sk', sk, crypto_box_SECRETKEYBYTES)

    msg = bytearray(len(c) - crypto_box_MACBYTES)
    _raise_on_error(
        lib.crypto_box_open_easy(
            ffi.cast("unsigned char *", ffi.from_buffer(msg)),
            c,
            len(c),
            nonce,
            pk,
            sk,
        ),
    )

    return binary_type(msg)
__init__.py 文件源码 项目:csodium 作者: ereOn 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def crypto_box_open_afternm(c, nonce, k):
    _assert_min_len('c', c, crypto_box_MACBYTES)
    _assert_len('nonce', nonce, crypto_box_NONCEBYTES)
    _assert_len('k', k, crypto_box_BEFORENMBYTES)

    msg = bytearray(len(c) - crypto_box_MACBYTES)
    _raise_on_error(
        lib.crypto_box_open_easy_afternm(
            ffi.cast("unsigned char *", ffi.from_buffer(msg)),
            c,
            len(c),
            nonce,
            k,
        ),
    )

    return binary_type(msg)


问题


面经


文章

微信
公众号

扫码关注公众号