python类UUID的实例源码

sessions.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _tag(value):
    if isinstance(value, tuple):
        return {' t': [_tag(x) for x in value]}
    elif isinstance(value, uuid.UUID):
        return {' u': value.hex}
    elif isinstance(value, bytes):
        return {' b': b64encode(value).decode('ascii')}
    elif callable(getattr(value, '__html__', None)):
        return {' m': text_type(value.__html__())}
    elif isinstance(value, list):
        return [_tag(x) for x in value]
    elif isinstance(value, datetime):
        return {' d': http_date(value)}
    elif isinstance(value, dict):
        return dict((k, _tag(v)) for k, v in iteritems(value))
    elif isinstance(value, str):
        try:
            return text_type(value)
        except UnicodeError:
            from flask.debughelpers import UnexpectedUnicodeError
            raise UnexpectedUnicodeError(u'A byte string with '
                u'non-ASCII data was passed to the session system '
                u'which can only store unicode strings.  Consider '
                u'base64 encoding your string (String was %r)' % value)
    return value
messaging.py 文件源码 项目:Brightside 作者: BrighterCommand 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def create_message_header(self) -> Dict:

        def _add_correlation_id(brightstide_message_header: Dict, correlation_id: UUID) -> None:
            if correlation_id is not None:
                brightstide_message_header[message_correlation_id_header] = str(correlation_id)

        def _add_message_id(brightside_message_header: Dict, identity: UUID) -> None:
            if identity is None:
                raise MessagingException("Missing id on message, this is a required field")
            brightside_message_header[message_id_header] = str(identity)

        def _add_message_type(brightside_message_header: Dict, brightside_message_type: str) -> None:
            if brightside_message_type is None:
                raise MessagingException("Missing type on message, this is a required field")
            brightside_message_header[message_type_header] = brightside_message_type

        header = {}
        _add_message_id(header, self._message.header.id)
        _add_message_type(header, self._message.header.message_type.name)
        _add_correlation_id(header, self._message.header.correlation_id)

        return header
auth_data_test.py 文件源码 项目:vsphere-storage-for-docker 作者: vmware 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_remove_vms(self):
        """
        """

        privileges = self.get_privileges()
        default_datastore = self.get_default_datastore()
        default_datastore_url = self.get_datastore_url(default_datastore)

        vms = [(self.vm1_uuid, self.vm1_name), (self.vm2_uuid, self.vm2_name)]
        error_info, tenant1 = self.auth_mgr.create_tenant(name=self.tenant_name,
                                                          description='Some tenant',
                                                          vms=vms,
                                                          privileges=privileges)

        self.assertEqual(error_info, None)
        self.assertTrue(uuid.UUID(tenant1.id))
        error_info = tenant1.remove_vms(self.auth_mgr.conn, vms)
        self.assertEqual(error_info, None)
        error_info, vms_row = auth.get_row_from_vms_table(self.auth_mgr.conn, tenant1.id)
        self.assertEqual(error_info, None)
        self.assertEqual(vms_row, [])
auth_data_test.py 文件源码 项目:vsphere-storage-for-docker 作者: vmware 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_set_name(self):
        vms = [(self.vm1_uuid, self.vm1_name)]

        privileges = self.get_privileges()
        default_datastore = self.get_default_datastore()
        default_datastore_url = self.get_datastore_url(default_datastore)
        error_info, tenant1 = self.auth_mgr.create_tenant(name=self.tenant_name,
                                                          description='Some tenant',
                                                          vms=vms,
                                                          privileges=privileges)

        self.assertEqual(error_info, None)
        self.assertTrue(uuid.UUID(tenant1.id))

        error_info = tenant1.set_name(self.auth_mgr.conn, self.tenant_name, self.tenant_2_name)
        self.assertEqual(error_info, None)
        error_info, tenants_row = auth.get_row_from_tenants_table(self.auth_mgr.conn, tenant1.id)
        self.assertEqual(error_info, None)
        expected_output = self.tenant_2_name
        actual_output = tenants_row[auth_data_const.COL_NAME]
        self.assertEqual(actual_output, expected_output)
auth_data_test.py 文件源码 项目:vsphere-storage-for-docker 作者: vmware 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_set_description(self):
        vms = [(self.vm1_uuid, self.vm1_name)]
        privileges = self.get_privileges()
        error_info, tenant1 = self.auth_mgr.create_tenant(name=self.tenant_name,
                                                          description='Some tenant',
                                                          vms=vms,
                                                          privileges=privileges)

        self.assertEqual(error_info, None)
        self.assertTrue(uuid.UUID(tenant1.id))
        error_info = tenant1.set_description(self.auth_mgr.conn, 'new description')
        self.assertEqual(error_info, None)
        error_info, tenants_row = auth.get_row_from_tenants_table(self.auth_mgr.conn, tenant1.id)
        self.assertEqual(error_info, None)
        expected_output = 'new description'
        actual_output = tenants_row[auth_data_const.COL_DESCRIPTION]
        self.assertEqual(actual_output, expected_output)
auth_data_test.py 文件源码 项目:vsphere-storage-for-docker 作者: vmware 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_set_default_datastore(self):
        vms = [(self.vm1_uuid, self.vm1_name)]
        privileges = self.get_privileges()
        default_datastore = self.get_default_datastore()
        default_datastore_url = self.get_datastore_url(default_datastore)
        error_info, tenant1 = self.auth_mgr.create_tenant(name=self.tenant_name,
                                                          description='Some tenant',
                                                          vms=vms,
                                                          privileges=privileges)

        self.assertEqual(error_info, None)
        self.assertTrue(uuid.UUID(tenant1.id))

        default_datastore = 'new_default_ds'
        default_datastore_url = self.get_datastore_url(default_datastore)
        error_info = tenant1.set_default_datastore(self.auth_mgr.conn, default_datastore_url)
        self.assertEqual(error_info, None)
        # Check tenants table
        error_info, tenants_row = auth.get_row_from_tenants_table(self.auth_mgr.conn, tenant1.id)
        self.assertEqual(error_info, None)
        expected_output = 'new_default_ds_url'
        actual_output = tenants_row[auth_data_const.COL_DEFAULT_DATASTORE_URL]
        self.assertEqual(actual_output, expected_output)
get_face_dlib_per3.py 文件源码 项目:facerecognition 作者: guoxiaolu 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_mac_address():
    import uuid
    node = uuid.getnode()
    mac = uuid.UUID(int=node).hex[-12:]
    return mac
manialink.py 文件源码 项目:PyPlanet 作者: PyPlanet 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def handle(self, player, action, values, **kwargs):
        if not action.startswith(self.id):
            return

        if not self._is_global_shown and player.login not in self._is_player_shown.keys():
            # Ignore if id is unique (uuid4)
            try:
                uuid.UUID(self.id, version=4)
            except:
                raise ManialinkMemoryLeakException(
                    'Old view instance (ml-id: {}) is not yet destroyed, but is receiving player callbacks!, '
                    'Make sure you are not removing old view instances with .destroy() and del variable! '
                    'Potential Memory Leak!! Should be fixed asap!'.format(self.id)
                )

        action_name = action[len(self.id)+2:]
        if action_name not in self.receivers:
            return await self.handle_catch_all(player, action_name, values)

        # Call receivers.
        for rec in self.receivers[action_name]:
            try:
                if iscoroutinefunction(rec):
                    await rec(player, action, values)
                else:
                    rec(player, action, values)
            except Exception as e:
                if self.throw_exceptions:
                    raise
                else:
                    logging.exception('Exception has been silenced in ManiaLink Action receiver:', exc_info=e)
message.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _set_uuid(self, uuid):
        if uuid is None:
            # UUID generation is expensive.  Using FastUUID instead of the built
            # in UUID methods increases Messages that can be instantiated per
            # second from ~25,000 to ~185,000.  Not generating UUIDs at all
            # increases the throughput further still to about 730,000 per
            # second.
            uuid = self._fast_uuid.uuid4()
        elif len(uuid) != 16:
            raise TypeError(
                "UUIDs should be exactly 16 bytes.  Conforming UUID's can be "
                "generated with `import uuid; uuid.uuid4().bytes`."
            )
        self._uuid = uuid
message.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def uuid_hex(self):
        # TODO: DATAPIPE-848
        return UUID(bytes=self.uuid).hex
tailer.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def run(self):
        logger.info(
            "Starting to consume from {}".format(self.topic_to_offsets_map)
        )

        with Consumer(
            # The tailer name should be unique - if it's not, partitions will
            # be split between multiple tailer instances
            'data_pipeline_tailer-{}'.format(
                str(UUID(bytes=FastUUID().uuid4()).hex)
            ),
            'bam',
            ExpectedFrequency.constantly,
            self.topic_to_offsets_map,
            auto_offset_reset=self.options.offset_reset_location,
            cluster_name=self.options.cluster_name
        ) as consumer:
            message_count = 0
            while self.keep_running(message_count):
                message = consumer.get_message(blocking=True, timeout=0.1)
                if message is not None:
                    if self.options.end_timestamp is None or message.timestamp < self.options.end_timestamp:
                        print self._format_message(message)
                        message_count += 1
                    else:
                        self._running = False
                        logger.info(
                            "Latest message surpasses --end-timestamp. Stopping tailer..."
                        )
operations.py 文件源码 项目:Plamber 作者: OlegKlimenko 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def convert_uuidfield_value(self, value, expression, connection, context):
        # New in Django 1.8
        if value is not None:
            value = uuid.UUID(value)
        return value
__init__.py 文件源码 项目:jx-sqlite 作者: mozilla 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def generateGuid():
    """Gets a random GUID.
    Note: python's UUID generation library is used here. 
    Basically UUID is the same as GUID when represented as a string.
    :Returns:
        str, the generated random GUID.

    a=GenerateGuid()
    import uuid
    print a
    print uuid.UUID(a).hex

    """
    return str(uuid4())
args.py 文件源码 项目:pscheduler 作者: perfsonar 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def arg_uuid(name):
    """Fetch and validate an argument as a UUID"""
    argval = request.args.get(name)
    if argval is None:
        return None
    # This will throw a ValueError if something's wrong.
    return str(uuid.UUID(argval))
util.py 文件源码 项目:pscheduler 作者: perfsonar 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def uuid_is_valid(test_uuid):
    """
    Determine if a UUID is valid
    """
    try:
        uuid_object = uuid.UUID(test_uuid)
    except ValueError:
        return False
    return True
binary.py 文件源码 项目:mongodb-monitoring 作者: jruaux 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __new__(cls, obj):
        if not isinstance(obj, UUID):
            raise TypeError("obj must be an instance of uuid.UUID")
        self = Binary.__new__(cls, obj.bytes, OLD_UUID_SUBTYPE)
        self.__uuid = obj
        return self
binary.py 文件源码 项目:mongodb-monitoring 作者: jruaux 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def uuid(self):
        """UUID instance wrapped by this UUIDLegacy instance.
        """
        return self.__uuid
__init__.py 文件源码 项目:mongodb-monitoring 作者: jruaux 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _get_binary(data, position, dummy0, opts, dummy1):
    """Decode a BSON binary to bson.binary.Binary or python UUID."""
    length, subtype = _UNPACK_LENGTH_SUBTYPE(data[position:position + 5])
    position += 5
    if subtype == 2:
        length2 = _UNPACK_INT(data[position:position + 4])[0]
        position += 4
        if length2 != length - 4:
            raise InvalidBSON("invalid binary (st 2) - lengths don't match!")
        length = length2
    end = position + length
    if subtype in (3, 4):
        # Java Legacy
        uuid_representation = opts.uuid_representation
        if uuid_representation == JAVA_LEGACY:
            java = data[position:end]
            value = uuid.UUID(bytes=java[0:8][::-1] + java[8:16][::-1])
        # C# legacy
        elif uuid_representation == CSHARP_LEGACY:
            value = uuid.UUID(bytes_le=data[position:end])
        # Python
        else:
            value = uuid.UUID(bytes=data[position:end])
        return value, end
    # Python3 special case. Decode subtype 0 to 'bytes'.
    if PY3 and subtype == 0:
        value = data[position:end]
    else:
        value = Binary(data[position:end], subtype)
    return value, end
test_udts.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_can_insert_udts_with_all_datatypes(self):
        """
        Test for inserting all column types into a UserType

        test_can_insert_udts_with_all_datatypes tests that each cqlengine column type can be inserted into a UserType.
        It first creates a UserType that has each cqlengine column type, and a corresponding table/Model. It then creates
        a UserType instance where all the fields have corresponding data, and inserts the UserType as an instance of the Model.
        Finally, it verifies that each column read from the UserType from Cassandra is the same as the input parameters.

        @since 2.5.0
        @jira_ticket PYTHON-251
        @expected_result The UserType is inserted with each column type, and the resulting read yields proper data for each column.

        @test_category data_types:udt
        """
        sync_table(AllDatatypesModel)
        self.addCleanup(drop_table, AllDatatypesModel)

        input = AllDatatypes(a='ascii', b=2 ** 63 - 1, c=bytearray(b'hello world'), d=True,
                             e=datetime.utcfromtimestamp(872835240), f=Decimal('12.3E+7'), g=2.39,
                             h=3.4028234663852886e+38, i='123.123.123.123', j=2147483647, k='text',
                             l=UUID('FE2B4360-28C6-11E2-81C1-0800200C9A66'),
                             m=UUID('067e6162-3b6f-4ae2-a171-2470b63dff00'), n=int(str(2147483647) + '000'))
        AllDatatypesModel.create(id=0, data=input)

        self.assertEqual(1, AllDatatypesModel.objects.count())
        output = AllDatatypesModel.objects.first().data

        for i in range(ord('a'), ord('a') + 14):
            self.assertEqual(input[chr(i)], output[chr(i)])
test_udts.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_nested_udts_inserts(self):
        """
        Test for inserting collections of user types using cql engine.

        test_nested_udts_inserts Constructs a model that contains a list of usertypes. It will then attempt to insert
        them. The expectation is that no exception is thrown during insert. For sanity sake we also validate that our
        input and output values match. This combination of model, and UT produces a syntax error in 2.5.1 due to
        improper quoting around the names collection.

        @since 2.6.0
        @jira_ticket PYTHON-311
        @expected_result No syntax exception thrown

        @test_category data_types:udt
        """

        class Name(UserType):
            type_name__ = "header"

            name = columns.Text()
            value = columns.Text()

        class Container(Model):
            id = columns.UUID(primary_key=True, default=uuid4)
            names = columns.List(columns.UserDefinedType(Name))

        # Construct the objects and insert them
        names = []
        for i in range(0, 10):
            names.append(Name(name="name{0}".format(i), value="value{0}".format(i)))

        # Create table, insert data
        sync_table(Container)
        self.addCleanup(drop_table, Container)

        Container.create(id=UUID('FE2B4360-28C6-11E2-81C1-0800200C9A66'), names=names)

        # Validate input and output matches
        self.assertEqual(1, Container.objects.count())
        names_output = Container.objects.first().names
        self.assertEqual(names_output, names)


问题


面经


文章

微信
公众号

扫码关注公众号