python类PY3的实例源码

_util.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def native(s):
    """
    Convert :py:class:`bytes` or :py:class:`unicode` to the native
    :py:class:`str` type, using UTF-8 encoding if conversion is necessary.

    :raise UnicodeError: The input string is not UTF-8 decodeable.

    :raise TypeError: The input is neither :py:class:`bytes` nor
        :py:class:`unicode`.
    """
    if not isinstance(s, (binary_type, text_type)):
        raise TypeError("%r is neither bytes nor unicode" % s)
    if PY3:
        if isinstance(s, binary_type):
            return s.decode("utf-8")
    else:
        if isinstance(s, text_type):
            return s.encode("utf-8")
    return s
backend.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _bn_to_int(self, bn):
        assert bn != self._ffi.NULL
        if six.PY3:
            # Python 3 has constant time from_bytes, so use that.
            bn_num_bytes = self._lib.BN_num_bytes(bn)
            bin_ptr = self._ffi.new("unsigned char[]", bn_num_bytes)
            bin_len = self._lib.BN_bn2bin(bn, bin_ptr)
            # A zero length means the BN has value 0
            self.openssl_assert(bin_len >= 0)
            return int.from_bytes(self._ffi.buffer(bin_ptr)[:bin_len], "big")
        else:
            # Under Python 2 the best we can do is hex()
            hex_cdata = self._lib.BN_bn2hex(bn)
            self.openssl_assert(hex_cdata != self._ffi.NULL)
            hex_str = self._ffi.string(hex_cdata)
            self._lib.OPENSSL_free(hex_cdata)
            return int(hex_str, 16)
tests.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_stack_output(self):
        self.assertEqual(u'<pre>foo</pre>', mappings.stack_output('foo'))
        self.assertEqual(u'', mappings.stack_output(None))

        outputs = ['one', 'two', 'three']
        # On Python 3, the pretty JSON output doesn't add space before newline
        if six.PY3:
            expected_text = """[\n  "one",\n  "two",\n  "three"\n]"""
        else:
            expected_text = """[\n  "one", \n  "two", \n  "three"\n]"""

        self.assertEqual(u'<pre>%s</pre>' % html.escape(expected_text),
                         mappings.stack_output(outputs))

        outputs = {'foo': 'bar'}
        expected_text = """{\n  "foo": "bar"\n}"""
        self.assertEqual(u'<pre>%s</pre>' % html.escape(expected_text),
                         mappings.stack_output(outputs))

        self.assertEqual(
            u'<a href="http://www.example.com/foo" target="_blank">'
            'http://www.example.com/foo</a>',
            mappings.stack_output('http://www.example.com/foo'))
tests.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_admin_metadata_defs_create_namespace_invalid_json_post_raw(self):
        form_data = {
            'source_type': 'raw',
            'direct_input': 'invalidjson'
        }

        res = self.client.post(reverse(constants.METADATA_CREATE_URL),
                               form_data)

        if six.PY3:
            err_msg = ('There was a problem loading the namespace: '
                       'Expecting value: line 1 column 1 (char 0).')
        else:
            err_msg = ('There was a problem loading the namespace: '
                       'No JSON object could be decoded.')
        self.assertFormError(res, "form", None, [err_msg])
util.py 文件源码 项目:workflows.kyoyue 作者: wizyoung 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def write(self, buffer):
        if self.mode == MODE_NUMBER:
            for i in xrange(0, len(self.data), 3):
                chars = self.data[i:i + 3]
                bit_length = NUMBER_LENGTH[len(chars)]
                buffer.put(int(chars), bit_length)
        elif self.mode == MODE_ALPHA_NUM:
            for i in xrange(0, len(self.data), 2):
                chars = self.data[i:i + 2]
                if len(chars) > 1:
                    buffer.put(
                        ALPHA_NUM.find(chars[0]) * 45 +
                        ALPHA_NUM.find(chars[1]), 11)
                else:
                    buffer.put(ALPHA_NUM.find(chars), 6)
        else:
            if six.PY3:
                # Iterating a bytestring in Python 3 returns an integer,
                # no need to ord().
                data = self.data
            else:
                data = [ord(c) for c in self.data]
            for c in data:
                buffer.put(c, 8)
files.py 文件源码 项目:quantrocket-client 作者: quantrocket-llc 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def write_response_to_filepath_or_buffer(filepath_or_buffer, response):
    """
    Writes the response content to the filepath or buffer.
    """
    if hasattr(filepath_or_buffer, "write"):
        if six.PY3 and filepath_or_buffer is sys.stdout:
            # Write bytes to stdout (https://stackoverflow.com/a/23932488)
            filepath_or_buffer = filepath_or_buffer.buffer
        mode = getattr(filepath_or_buffer, "mode", "w")
        for chunk in response.iter_content(chunk_size=1024):
            if chunk:
                if "b" not in mode and six.PY3:
                    chunk = chunk.decode("utf-8")
                filepath_or_buffer.write(chunk)
        if filepath_or_buffer.seekable():
            filepath_or_buffer.seek(0)
    else:
        with open(filepath_or_buffer, "wb") as f:
            for chunk in response.iter_content(chunk_size=1024):
                if chunk:
                    f.write(chunk)
protocol.py 文件源码 项目:txamqp 作者: txamqp 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def read_content(queue):
    frame = yield queue.get()
    header = frame.payload
    children = []
    for i in range(header.weight):
        content = yield read_content(queue)
        children.append(content)
    size = header.size
    read = 0
    buf = six.StringIO()
    while read < size:
        body = yield queue.get()
        content = body.payload.content

        # if this is the first instance of real binary content, convert the string buffer to BytesIO
        # Not a nice fix but it preserves the original behaviour
        if six.PY3 and isinstance(content, bytes) and isinstance(buf, six.StringIO):
            buf = six.BytesIO()

        buf.write(content)
        read += len(content)
    defer.returnValue(Content(buf.getvalue(), children, header.properties.copy()))
mock.py 文件源码 项目:auger 作者: laffra 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _instance_callable(obj):
    """Given an object, return True if the object is callable.
    For classes, return True if instances would be callable."""
    if not isinstance(obj, ClassTypes):
        # already an instance
        return getattr(obj, '__call__', None) is not None

    if six.PY3:
        # *could* be broken by a class overriding __mro__ or __dict__ via
        # a metaclass
        for base in (obj,) + obj.__mro__:
            if base.__dict__.get('__call__') is not None:
                return True
    else:
        klass = obj
        # uses __bases__ instead of __mro__ so that we work with old style classes
        if klass.__dict__.get('__call__') is not None:
            return True

        for base in klass.__bases__:
            if _instance_callable(base):
                return True
    return False
url.py 文件源码 项目:purelove 作者: hucmosin 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def query_params(self, value=None):
        """
        Return or set a dictionary of query params

        :param dict value: new dictionary of values
        """
        if value is not None:
            return URL._mutate(self, query=unicode_urlencode(value, doseq=True))
        query = '' if self._tuple.query is None else self._tuple.query

        # In Python 2.6, urlparse needs a bytestring so we encode and then
        # decode the result.
        if not six.PY3:
            result = parse_qs(to_utf8(query), True)
            return dict_to_unicode(result)

        return parse_qs(query, True)
db.py 文件源码 项目:panko 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def run_with(*drivers):
    """Used to mark tests that are only applicable for certain db driver.

    Skips test if driver is not available.
    """
    def decorator(test):
        if isinstance(test, type) and issubclass(test, TestBase):
            # Decorate all test methods
            for attr in dir(test):
                value = getattr(test, attr)
                if callable(value) and attr.startswith('test_'):
                    if six.PY3:
                        value._run_with = drivers
                    else:
                        value.__func__._run_with = drivers
        else:
            test._run_with = drivers
        return test
    return decorator
test_bin.py 文件源码 项目:panko 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _test_run_expirer_ttl_enabled(self, ttl_name, data_name):
        content = ("[database]\n"
                   "%s=1\n"
                   "connection=log://localhost\n" % ttl_name)
        if six.PY3:
            content = content.encode('utf-8')
        self.tempfile = fileutils.write_to_tempfile(content=content,
                                                    prefix='panko',
                                                    suffix='.conf')
        subp = subprocess.Popen(['panko-expirer',
                                 '-d',
                                 "--config-file=%s" % self.tempfile],
                                stdout=subprocess.PIPE)
        out, __ = subp.communicate()
        self.assertEqual(0, subp.poll())
        msg = "Dropping %s data with TTL 1" % data_name
        if six.PY3:
            msg = msg.encode('utf-8')
        self.assertIn(msg, out)
validate_submission_lib.py 文件源码 项目:cleverhans 作者: tensorflow 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _verify_docker_image_size(self, image_name):
    """Verifies size of Docker image.

    Args:
      image_name: name of the Docker image.

    Returns:
      True if image size is withing the limits, False otherwise.
    """
    shell_call(['docker', 'pull', image_name])
    try:
      image_size = subprocess.check_output(
          ['docker', 'inspect', '--format={{.Size}}', image_name]).strip()
      image_size = int(image_size) if PY3 else long(image_size)
    except (ValueError, subprocess.CalledProcessError) as e:
      logging.error('Failed to determine docker image size: %s', e)
      return False
    logging.info('Size of docker image %s is %d', image_name, image_size)
    if image_size > MAX_DOCKER_IMAGE_SIZE:
      logging.error('Image size exceeds limit %d', MAX_DOCKER_IMAGE_SIZE)
    return image_size <= MAX_DOCKER_IMAGE_SIZE
jsarray.py 文件源码 项目:tvlinker 作者: ozmartian 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def sort(cmpfn):
        if not this.Class in ('Array', 'Arguments'):
            return this.to_object() # do nothing
        arr = []
        for i in xrange(len(this)):
            arr.append(this.get(six.text_type(i)))

        if not arr:
            return this
        if not cmpfn.is_callable():
            cmpfn = None
        cmp = lambda a,b: sort_compare(a, b, cmpfn)
        if six.PY3:
            key = functools.cmp_to_key(cmp)
            arr.sort(key=key)
        else:
            arr.sort(cmp=cmp)
        for i in xrange(len(arr)):
            this.put(six.text_type(i), arr[i])

        return this
util.py 文件源码 项目:Texty 作者: sarthfrey 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def compute_signature(self, uri, params, utf=PY3):
        """Compute the signature for a given request

        :param uri: full URI that Twilio requested on your server
        :param params: post vars that Twilio sent with the request
        :param utf: whether return should be bytestring or unicode (python3)

        :returns: The computed signature
        """
        s = uri
        if len(params) > 0:
            for k, v in sorted(params.items()):
                s += k + v

        # compute signature and compare signatures
        mac = hmac.new(self.token, s.encode("utf-8"), sha1)
        computed = base64.b64encode(mac.digest())
        if utf:
            computed = computed.decode('utf-8')

        return computed.strip()
test_sync_client_hooks.py 文件源码 项目:opentracing-python-instrumentation 作者: uber-common 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def install_hooks(request):
    urllibver = request.getfixturevalue('urllibver')

    if urllibver == 'urllib2':
        if six.PY3:
            yield None
            return
        module = urllib2
    else:
        module = urllib.request

    old_opener = module._opener
    old_callee_headers = CONFIG.callee_name_headers
    old_endpoint_headers = CONFIG.callee_endpoint_headers

    urllib2_hooks.install_patches.__original_func()
    CONFIG.callee_name_headers = ['Remote-Loc']
    CONFIG.callee_endpoint_headers = ['Remote-Op']

    yield module

    module.install_opener(old_opener)
    CONFIG.callee_name_headers = old_callee_headers
    CONFIG.callee_endpoint_headers = old_endpoint_headers
fake_client.py 文件源码 项目:python-bileanclient 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, data):
        super(TestResponse, self).__init__()
        self._content_consumed = True
        if isinstance(data, dict):
            self.status_code = data.get('status_code', 200)
            # Fake the text attribute to streamline Response creation
            text = data.get('text', "")
            if isinstance(text, (dict, list)):
                self._content = json.dumps(text)
                default_headers = {
                    "Content-Type": "application/json",
                }
            else:
                self._content = text
                default_headers = {}
            if six.PY3 and isinstance(self._content, six.string_types):
                self._content = self._content.encode('utf-8', 'strict')
            self.headers = data.get('headers') or default_headers
        else:
            self.status_code = data
test_rrule.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def testLongIntegers(self):
        if not PY3:  # There is no longs in python3
            self.assertEqual(list(rrule(MINUTELY,
                                  count=long(2),
                                  interval=long(2),
                                  bymonth=long(2),
                                  byweekday=long(3),
                                  byhour=long(6),
                                  byminute=long(6),
                                  bysecond=long(6),
                                  dtstart=datetime(1997, 9, 2, 9, 0))),
                             [datetime(1998, 2, 5, 6, 6, 6),
                              datetime(1998, 2, 12, 6, 6, 6)])
            self.assertEqual(list(rrule(YEARLY,
                                  count=long(2),
                                  bymonthday=long(5),
                                  byweekno=long(2),
                                  dtstart=datetime(1997, 9, 2, 9, 0))),
                             [datetime(1998, 1, 5, 9, 0),
                              datetime(2004, 1, 5, 9, 0)])
test_rrule.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def testToStrLongIntegers(self):
        if not PY3:  # There is no longs in python3
            self._rrulestr_reverse_test(rrule(MINUTELY,
                                  count=long(2),
                                  interval=long(2),
                                  bymonth=long(2),
                                  byweekday=long(3),
                                  byhour=long(6),
                                  byminute=long(6),
                                  bysecond=long(6),
                                  dtstart=datetime(1997, 9, 2, 9, 0)))

            self._rrulestr_reverse_test(rrule(YEARLY,
                                  count=long(2),
                                  bymonthday=long(5),
                                  byweekno=long(2),
                                  dtstart=datetime(1997, 9, 2, 9, 0)))
_common.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def tzname_in_python2(namefunc):
    """Change unicode output into bytestrings in Python 2

    tzname() API changed in Python 3. It used to return bytes, but was changed
    to unicode strings
    """
    def adjust_encoding(*args, **kwargs):
        name = namefunc(*args, **kwargs)
        if name is not None and not PY3:
            name = name.encode()

        return name

    return adjust_encoding


# The following is adapted from Alexander Belopolsky's tz library
# https://github.com/abalkin/tz
short_id.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_id(source_uuid):
    """Derive a short (12 character) id from a random UUID.

    The supplied UUID must be a version 4 UUID object.
    """

    if isinstance(source_uuid, six.string_types):
        source_uuid = uuid.UUID(source_uuid)
    if source_uuid.version != 4:
        raise ValueError(_('Invalid UUID version (%d)') % source_uuid.version)

    # The "time" field of a v4 UUID contains 60 random bits
    # (see RFC4122, Section 4.4)
    random_bytes = _to_byte_string(source_uuid.time, 60)
    # The first 12 bytes (= 60 bits) of base32-encoded output is our data
    encoded = base64.b32encode(six.b(random_bytes))[:12]

    if six.PY3:
        return encoded.lower().decode('utf-8')
    else:
        return encoded.lower()


问题


面经


文章

微信
公众号

扫码关注公众号