python类b()的实例源码

_helpers.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _parse_pem_key(raw_key_input):
    """Identify and extract PEM keys.

    Determines whether the given key is in the format of PEM key, and extracts
    the relevant part of the key if it is.

    Args:
        raw_key_input: The contents of a private key file (either PEM or
                       PKCS12).

    Returns:
        string, The actual key if the contents are from a PEM file, or
        else None.
    """
    offset = raw_key_input.find(b'-----BEGIN ')
    if offset != -1:
        return raw_key_input[offset:]
test_run_no_updates_available.py 文件源码 项目:pyupdater-wx-demo 作者: wettenhj 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def setUp(self):
        tempFile = tempfile.NamedTemporaryFile()
        self.fileServerDir = tempFile.name
        tempFile.close()
        os.mkdir(self.fileServerDir)
        os.environ['PYUPDATER_FILESERVER_DIR'] = self.fileServerDir
        privateKey = ed25519.SigningKey(PRIVATE_KEY.encode('utf-8'),
                                        encoding='base64')
        signature = privateKey.sign(six.b(json.dumps(VERSIONS, sort_keys=True)),
                                    encoding='base64').decode()
        VERSIONS['signature'] = signature
        keysFilePath = os.path.join(self.fileServerDir, 'keys.gz')
        with gzip.open(keysFilePath, 'wb') as keysFile:
            keysFile.write(json.dumps(KEYS, sort_keys=True))
        versionsFilePath = os.path.join(self.fileServerDir, 'versions.gz')
        with gzip.open(versionsFilePath, 'wb') as versionsFile:
            versionsFile.write(json.dumps(VERSIONS, sort_keys=True))
        os.environ['WXUPDATEDEMO_TESTING'] = 'True'
        from wxupdatedemo.config import CLIENT_CONFIG
        self.clientConfig = CLIENT_CONFIG
        self.clientConfig.PUBLIC_KEY = PUBLIC_KEY
test_run_update_available.py 文件源码 项目:pyupdater-wx-demo 作者: wettenhj 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def setUp(self):
        tempFile = tempfile.NamedTemporaryFile()
        self.fileServerDir = tempFile.name
        tempFile.close()
        os.mkdir(self.fileServerDir)
        os.environ['PYUPDATER_FILESERVER_DIR'] = self.fileServerDir
        privateKey = ed25519.SigningKey(PRIVATE_KEY.encode('utf-8'),
                                        encoding='base64')
        signature = privateKey.sign(six.b(json.dumps(VERSIONS, sort_keys=True)),
                                    encoding='base64').decode()
        VERSIONS['signature'] = signature
        keysFilePath = os.path.join(self.fileServerDir, 'keys.gz')
        with gzip.open(keysFilePath, 'wb') as keysFile:
            keysFile.write(json.dumps(KEYS, sort_keys=True))
        versionsFilePath = os.path.join(self.fileServerDir, 'versions.gz')
        with gzip.open(versionsFilePath, 'wb') as versionsFile:
            versionsFile.write(json.dumps(VERSIONS, sort_keys=True))
        os.environ['WXUPDATEDEMO_TESTING'] = 'True'
        from wxupdatedemo.config import CLIENT_CONFIG
        self.clientConfig = CLIENT_CONFIG
        self.clientConfig.PUBLIC_KEY = PUBLIC_KEY
        self.clientConfig.APP_NAME = APP_NAME
test_query.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_clear(self):
        keyspace = 'keyspace'
        routing_key = 'routing_key'
        custom_payload = {'key': six.b('value')}

        ss = SimpleStatement('whatever', keyspace=keyspace, routing_key=routing_key, custom_payload=custom_payload)

        batch = BatchStatement()
        batch.add(ss)

        self.assertTrue(batch._statements_and_parameters)
        self.assertEqual(batch.keyspace, keyspace)
        self.assertEqual(batch.routing_key, routing_key)
        self.assertEqual(batch.custom_payload, custom_payload)

        batch.clear()
        self.assertFalse(batch._statements_and_parameters)
        self.assertIsNone(batch.keyspace)
        self.assertIsNone(batch.routing_key)
        self.assertFalse(batch.custom_payload)

        batch.add(ss)
test_udts.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_non_frozen_udts(self):
        """
        Test to ensure that non frozen udt's work with C* >3.6.

        @since 3.7.0
        @jira_ticket PYTHON-498
        @expected_result Non frozen UDT's are supported

        @test_category data_types, udt
        """
        self.session.execute("USE {0}".format(self.keyspace_name))
        self.session.execute("CREATE TYPE user (state text, has_corn boolean)")
        self.session.execute("CREATE TABLE {0} (a int PRIMARY KEY, b user)".format(self.function_table_name))
        User = namedtuple('user', ('state', 'has_corn'))
        self.cluster.register_user_type(self.keyspace_name, "user", User)
        self.session.execute("INSERT INTO {0} (a, b) VALUES (%s, %s)".format(self.function_table_name), (0, User("Nebraska", True)))
        self.session.execute("UPDATE {0} SET b.has_corn = False where a = 0".format(self.function_table_name))
        result = self.session.execute("SELECT * FROM {0}".format(self.function_table_name))
        self.assertFalse(result[0].b.has_corn)
        table_sql = self.cluster.metadata.keyspaces[self.keyspace_name].tables[self.function_table_name].as_cql_query()
        self.assertNotIn("<frozen>", table_sql)
test_udts.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_raise_error_on_nonexisting_udts(self):
        """
        Test for ensuring that an error is raised for operating on a nonexisting udt or an invalid keyspace
        """

        c = Cluster(protocol_version=PROTOCOL_VERSION)
        s = c.connect(self.keyspace_name, wait_for_all_pools=True)
        User = namedtuple('user', ('age', 'name'))

        with self.assertRaises(UserTypeDoesNotExist):
            c.register_user_type("some_bad_keyspace", "user", User)

        with self.assertRaises(UserTypeDoesNotExist):
            c.register_user_type("system", "user", User)

        with self.assertRaises(InvalidRequest):
            s.execute("CREATE TABLE mytable (a int PRIMARY KEY, b frozen<user>)")

        c.shutdown()
test_metadata.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_cluster_column_ordering_reversed_metadata(self):
        """
        Simple test to ensure that the metatdata associated with cluster ordering is surfaced is surfaced correctly.

        Creates a table with a few clustering keys. Then checks the clustering order associated with clustering columns
        and ensure it's set correctly.
        @since 3.0.0
        @jira_ticket PYTHON-402
        @expected_result is_reversed is set on DESC order, and is False on ASC

        @test_category metadata
        """

        create_statement = self.make_create_statement(["a"], ["b", "c"], ["d"], compact=True)
        create_statement += " AND CLUSTERING ORDER BY (b ASC, c DESC)"
        self.session.execute(create_statement)
        tablemeta = self.get_table_metadata()
        b_column = tablemeta.columns['b']
        self.assertFalse(b_column.is_reversed)
        c_column = tablemeta.columns['c']
        self.assertTrue(c_column.is_reversed)
test_metadata.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_cql_compatibility(self):
        if CASS_SERVER_VERSION >= (3, 0):
            raise unittest.SkipTest("cql compatibility does not apply Cassandra 3.0+")

        # having more than one non-PK column is okay if there aren't any
        # clustering columns
        create_statement = self.make_create_statement(["a"], [], ["b", "c", "d"], compact=True)
        self.session.execute(create_statement)
        tablemeta = self.get_table_metadata()

        self.assertEqual([u'a'], [c.name for c in tablemeta.partition_key])
        self.assertEqual([], tablemeta.clustering_key)
        self.assertEqual([u'a', u'b', u'c', u'd'], sorted(tablemeta.columns.keys()))

        self.assertTrue(tablemeta.is_cql_compatible)

        # ... but if there are clustering columns, it's not CQL compatible.
        # This is a hacky way to simulate having clustering columns.
        tablemeta.clustering_key = ["foo", "bar"]
        tablemeta.columns["foo"] = None
        tablemeta.columns["bar"] = None
        self.assertFalse(tablemeta.is_cql_compatible)
test_metadata.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_indexes(self):
        create_statement = self.make_create_statement(["a"], ["b", "c"], ["d", "e", "f"])
        create_statement += " WITH CLUSTERING ORDER BY (b ASC, c ASC)"
        execute_until_pass(self.session, create_statement)

        d_index = "CREATE INDEX d_index ON %s.%s (d)" % (self.keyspace_name, self.function_table_name)
        e_index = "CREATE INDEX e_index ON %s.%s (e)" % (self.keyspace_name, self.function_table_name)
        execute_until_pass(self.session, d_index)
        execute_until_pass(self.session, e_index)

        tablemeta = self.get_table_metadata()
        statements = tablemeta.export_as_string().strip()
        statements = [s.strip() for s in statements.split(';')]
        statements = list(filter(bool, statements))
        self.assertEqual(3, len(statements))
        self.assertIn(d_index, statements)
        self.assertIn(e_index, statements)

        # make sure indexes are included in KeyspaceMetadata.export_as_string()
        ksmeta = self.cluster.metadata.keyspaces[self.keyspace_name]
        statement = ksmeta.export_as_string()
        self.assertIn('CREATE INDEX d_index', statement)
        self.assertIn('CREATE INDEX e_index', statement)
test_metadata.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_non_size_tiered_compaction(self):
        """
        test options for non-size-tiered compaction strategy

        Creates a table with LeveledCompactionStrategy, specifying one non-default option. Verifies that the option is
        present in generated CQL, and that other legacy table parameters (min_threshold, max_threshold) are not included.

        @since 2.6.0
        @jira_ticket PYTHON-352
        @expected_result the options map for LeveledCompactionStrategy does not contain min_threshold, max_threshold

        @test_category metadata
        """
        create_statement = self.make_create_statement(["a"], [], ["b", "c"])
        create_statement += "WITH COMPACTION = {'class': 'LeveledCompactionStrategy', 'tombstone_threshold': '0.3'}"
        self.session.execute(create_statement)

        table_meta = self.get_table_metadata()
        cql = table_meta.export_as_string()
        self.assertIn("'tombstone_threshold': '0.3'", cql)
        self.assertIn("LeveledCompactionStrategy", cql)
        self.assertNotIn("min_threshold", cql)
        self.assertNotIn("max_threshold", cql)
test_task_executor.py 文件源码 项目:captaincloud 作者: bpsagar 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_simple(self):
        task1 = TaskRegistry.create('test_executor_add')
        task1.Input.a = 10
        task1.Input.b = 20

        task2 = TaskRegistry.create('test_executor_add')
        task2.Input.a = 100
        task2.Input.b = 200

        executor1 = TaskExecutor()
        executor1.set_task(task1)
        executor1.execute()

        executor2 = TaskExecutor()
        executor2.set_task(task2)
        executor2.execute()

        self.assertEqual(task1.Output.c, 30)
        self.assertEqual(task2.Output.c, 300)
test_task_executor.py 文件源码 项目:captaincloud 作者: bpsagar 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_stream(self):
        with open('test.tmp', 'wb') as fd:
            fd.write(six.b('Hello World ') * 100)

        task = TaskRegistry.create('test_executor_stream_copy')
        task.Input.stream = open('test.tmp', 'rb')
        task.Output.stream = open('test.out', 'wb')

        executor = TaskExecutor()
        executor.set_task(task)
        executor.execute()

        with open('test.out', 'rb') as fd:
            self.assertEqual(fd.read(), six.b('Hello World ') * 100)

        os.unlink('test.tmp')
        os.unlink('test.out')
test_field.py 文件源码 项目:captaincloud 作者: bpsagar 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_string_field(self):
        instance = StringField()
        self.assertEqual(instance.get_initial(), None)

        with self.assertRaises(InvalidValueException):
            instance.validate(1)

        with self.assertRaises(InvalidValueException):
            instance.validate(1.5)

        with self.assertRaises(InvalidValueException):
            instance.validate(six.b('ABC'))

        instance = StringField(default=six.u('ABC'))
        self.assertEqual(instance.get_initial(), six.u('ABC'))

        instance.validate(six.u('hello'))
test_field.py 文件源码 项目:captaincloud 作者: bpsagar 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_byte_field(self):
        instance = ByteField()
        self.assertEqual(instance.get_initial(), None)

        with self.assertRaises(InvalidValueException):
            instance.validate(1)

        with self.assertRaises(InvalidValueException):
            instance.validate(1.5)

        with self.assertRaises(InvalidValueException):
            instance.validate(six.u('ABC'))

        instance = ByteField(default=six.b('ABC'))
        self.assertEqual(instance.get_initial(), six.b('ABC'))

        instance.validate(six.b('hello'))
test_field.py 文件源码 项目:captaincloud 作者: bpsagar 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_struct_field(self):
        instance = StructField(a=IntegerField(), b=FloatField())
        val = instance.create()
        val.a = 100
        val.b = 3.14
        self.assertEqual(val.a, 100)

        nested_instance = StructField(
            a=IntegerField(),
            b=StructField(
                c=FloatField(),
                d=StringField(default=six.u('hello world'))
            )
        )

        val = nested_instance.create()
        val.a = 100
        val.b.c = 3.14
        self.assertEqual(val.b.c, 3.14)
        self.assertEqual(val.b.d, six.u('hello world'))
test_io_handling.py 文件源码 项目:aws-encryption-sdk-cli 作者: awslabs 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_single_io_write_stream_encode_output(
        tmpdir,
        patch_aws_encryption_sdk_stream,
        patch_json_ready_header,
        patch_json_ready_header_auth
):
    patch_aws_encryption_sdk_stream.return_value = io.BytesIO(DATA)
    patch_aws_encryption_sdk_stream.return_value.header = MagicMock(encryption_context=sentinel.encryption_context)
    target_file = tmpdir.join('target')
    mock_source = MagicMock()
    kwargs = GOOD_IOHANDLER_KWARGS.copy()
    kwargs['encode_output'] = True
    handler = io_handling.IOHandler(**kwargs)
    with open(str(target_file), 'wb') as destination_writer:
        handler._single_io_write(
            stream_args={
                'mode': 'encrypt',
                'a': sentinel.a,
                'b': sentinel.b
            },
            source=mock_source,
            destination_writer=destination_writer
        )

    assert target_file.read('rb') == base64.b64encode(DATA)
test_io_handling.py 文件源码 项目:aws-encryption-sdk-cli 作者: awslabs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_should_write_file_does_exist(tmpdir, patch_input, interactive, no_overwrite, user_input, expected):
    target_file = tmpdir.join('target')
    target_file.write(b'')
    patch_input.return_value = user_input
    kwargs = GOOD_IOHANDLER_KWARGS.copy()
    kwargs.update(dict(
        interactive=interactive,
        no_overwrite=no_overwrite
    ))
    handler = io_handling.IOHandler(**kwargs)

    should_write = handler._should_write_file(str(target_file))

    if expected:
        assert should_write
    else:
        assert not should_write
metadata.py 文件源码 项目:aws-encryption-sdk-cli 作者: awslabs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def write_metadata(self, **metadata):
        # type: (**Any) -> Optional[int]
        """Writes metadata to the output stream if output is not suppressed.

        :param **metadata: JSON-serializeable metadata kwargs to write
        """
        if self.suppress_output:
            return 0  # wrote 0 bytes

        metadata_line = json.dumps(metadata, sort_keys=True) + os.linesep
        metadata_output = ''  # type: Union[str, bytes]
        if 'b' in self._output_mode:
            metadata_output = metadata_line.encode('utf-8')
        else:
            metadata_output = metadata_line
        return self._output_stream.write(metadata_output)
util.py 文件源码 项目:workflows.kyoyue 作者: wizyoung 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def optimal_data_chunks(data, minimum=4):
    """
    An iterator returning QRData chunks optimized to the data content.

    :param minimum: The minimum number of bytes in a row to split as a chunk.
    """
    data = to_bytestring(data)
    re_repeat = (
        six.b('{') + six.text_type(minimum).encode('ascii') + six.b(',}'))
    num_pattern = re.compile(six.b('\d') + re_repeat)
    num_bits = _optimal_split(data, num_pattern)
    alpha_pattern = re.compile(
        six.b('[') + re.escape(ALPHA_NUM) + six.b(']') + re_repeat)
    for is_num, chunk in num_bits:
        if is_num:
            yield QRData(chunk, mode=MODE_NUMBER, check_data=False)
        else:
            for is_alpha, sub_chunk in _optimal_split(chunk, alpha_pattern):
                if is_alpha:
                    mode = MODE_ALPHA_NUM
                else:
                    mode = MODE_8BIT_BYTE
                yield QRData(sub_chunk, mode=mode, check_data=False)
_helpers.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _parse_pem_key(raw_key_input):
    """Identify and extract PEM keys.

    Determines whether the given key is in the format of PEM key, and extracts
    the relevant part of the key if it is.

    Args:
        raw_key_input: The contents of a private key file (either PEM or
                       PKCS12).

    Returns:
        string, The actual key if the contents are from a PEM file, or
        else None.
    """
    offset = raw_key_input.find(b'-----BEGIN ')
    if offset != -1:
        return raw_key_input[offset:]
wall_envs.py 文件源码 项目:gym-extensions 作者: Breakend 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def rotate_vector(v, axis, theta):
    """
    Return the rotation matrix associated with counterclockwise rotation about
    the given axis by theta radians.
    """
    axis = np.asarray(axis)
    axis = axis/math.sqrt(np.dot(axis, axis))
    a = math.cos(theta/2.0)
    b, c, d = -axis*math.sin(theta/2.0)
    aa, bb, cc, dd = a*a, b*b, c*c, d*d
    bc, ad, ac, ab, bd, cd = b*c, a*d, a*c, a*b, b*d, c*d
    R = np.array([[aa+bb-cc-dd, 2*(bc+ad), 2*(bd-ac)],
                     [2*(bc-ad), aa+cc-bb-dd, 2*(cd+ab)],
                     [2*(bd+ac), 2*(cd-ab), aa+dd-bb-cc]])

    return np.dot(R, v)
test_traceback.py 文件源码 项目:devsecops-example-helloworld 作者: boozallen 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_format_locals(self):
        def some_inner(k, v):
            a = 1
            b = 2
            return traceback.StackSummary.extract(
                traceback.walk_stack(None), capture_locals=True, limit=1)
        s = some_inner(3, 4)
        self.assertEqual(
            ['  File "' + FNAME + '", line 651, '
             'in some_inner\n'
             '    traceback.walk_stack(None), capture_locals=True, limit=1)\n'
             '    a = 1\n'
             '    b = 2\n'
             '    k = 3\n'
             '    v = 4\n'
            ], s.format())
test_forwarding.py 文件源码 项目:microcosm-flask 作者: globality-corp 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_host_override(self):
        loader = load_from_dict(
            port_forwarding=dict(
                host="service",
            )
        )
        client = self.init(loader)

        response = client.get(
            "/",
            headers={
                "X-Forwarded-Port": "8080",
            },
        )

        assert_that(response.status_code, is_(equal_to(200)))
        assert_that(response.data, is_(equal_to(b("http://service/"))))
test_requests_staticmock.py 文件源码 项目:requests-staticmock 作者: tonybaloney 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_class_context_manager_good_factory():
    class_session = Session()

    class TestMockClass(BaseMockClass):
        def _test_json(self, request):
            return StaticResponseFactory.GoodResponse(
                request=request,
                body=b("it's my life"),
                headers={'now': 'never'},
                status_code=201
            )

    with mock_session_with_class(class_session, TestMockClass, 'http://test.com'):
        response = class_session.get('http://test.com/test.json')
    assert response.text == "it's my life"
    assert 'now' in response.headers.keys()
    assert response.headers['now'] == 'never'
    assert response.status_code == 201
test_requests_staticmock.py 文件源码 项目:requests-staticmock 作者: tonybaloney 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_class_context_manager_bad_factory():
    class_session = Session()

    class TestMockClass(BaseMockClass):
        def _test_json(self, request):
            return StaticResponseFactory.BadResponse(
                request=request,
                body=b("it's not over"),
                headers={'now': 'never'},
            )

    with mock_session_with_class(class_session, TestMockClass, 'http://test.com'):
        response = class_session.get('http://test.com/test.json')
    assert response.text == "it's not over"
    assert 'now' in response.headers.keys()
    assert response.headers['now'] == 'never'
    assert response.status_code == DEFAULT_BAD_STATUS_CODE
test_traceback.py 文件源码 项目:deb-python-traceback2 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_format_locals(self):
        def some_inner(k, v):
            a = 1
            b = 2
            return traceback.StackSummary.extract(
                traceback.walk_stack(None), capture_locals=True, limit=1)
        s = some_inner(3, 4)
        self.assertEqual(
            ['  File "' + FNAME + '", line 651, '
             'in some_inner\n'
             '    traceback.walk_stack(None), capture_locals=True, limit=1)\n'
             '    a = 1\n'
             '    b = 2\n'
             '    k = 3\n'
             '    v = 4\n'
            ], s.format())
http_error.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __call__(self, environ, start_response):
        for err_str in self.app_iter:
            err = {}
            try:
                err = json.loads(err_str.decode('utf-8'))
            except ValueError:
                pass

            links = {'rel': 'help', 'href': 'https://developer.openstack.org'
                     '/api-guide/compute/microversions.html'}

            err['max_version'] = self.max_version
            err['min_version'] = self.min_version
            err['code'] = "zun.microversion-unsupported"
            err['links'] = [links]
            err['title'] = "Requested microversion is unsupported"

        self.app_iter = [six.b(json.dump_as_bytes(err))]
        self.headers['Content-Length'] = str(len(self.app_iter[0]))

        return super(HTTPNotAcceptableAPIVersion, self).__call__(
            environ, start_response)
short_id.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_id(source_uuid):
    """Derive a short (12 character) id from a random UUID.

    The supplied UUID must be a version 4 UUID object.
    """

    if isinstance(source_uuid, six.string_types):
        source_uuid = uuid.UUID(source_uuid)
    if source_uuid.version != 4:
        raise ValueError(_('Invalid UUID version (%d)') % source_uuid.version)

    # The "time" field of a v4 UUID contains 60 random bits
    # (see RFC4122, Section 4.4)
    random_bytes = _to_byte_string(source_uuid.time, 60)
    # The first 12 bytes (= 60 bits) of base32-encoded output is our data
    encoded = base64.b32encode(six.b(random_bytes))[:12]

    if six.PY3:
        return encoded.lower().decode('utf-8')
    else:
        return encoded.lower()
test_packing.py 文件源码 项目:capnpy 作者: antocuni 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def check(self, fmt, value):
        from random import randrange
        # build a buffer which is surely big enough to contain what we need
        # and check:
        #   1) that we correctly write the bytes we expect
        #   2) that we do NOT write outside the bounds
        #
        pattern = [six.int2byte(randrange(256)) for _ in range(256)]
        pattern = b''.join(pattern)
        buf = bytearray(pattern)
        buf2 = bytearray(pattern)
        offset = 16
        pack_into(ord(fmt), buf, offset, value)
        struct.pack_into(fmt, buf2, offset, value)
        assert buf == buf2
        #
        # check that it raises if it's out of bound
        out_of_bound = 256-struct.calcsize(fmt)+1
        pytest.raises(IndexError, "pack_into(ord(fmt), buf, out_of_bound, value)")
test_message.py 文件源码 项目:capnpy 作者: antocuni 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_dumps_alignment():
    class Person(Struct):
        pass

    buf = b('\x20\x00\x00\x00\x00\x00\x00\x00'   # age=32
            '\x01\x00\x00\x00\x2a\x00\x00\x00'   # name=ptr
            'J' 'o' 'h' 'n' '\x00\x00\x00\x00')  # John

    p = Person.from_buffer(buf, 0, data_size=1, ptrs_size=1)
    msg = dumps(p)
    exp = b('\x00\x00\x00\x00\x04\x00\x00\x00'   # message header: 1 segment, size 3 words
            '\x00\x00\x00\x00\x01\x00\x01\x00'   # ptr to payload
            '\x20\x00\x00\x00\x00\x00\x00\x00'   # age=32
            '\x01\x00\x00\x00\x2a\x00\x00\x00'   # name=ptr
            'J' 'o' 'h' 'n' '\x00\x00\x00\x00')  # John
    assert msg == exp


问题


面经


文章

微信
公众号

扫码关注公众号