def _tag(value):
if isinstance(value, tuple):
return {' t': [_tag(x) for x in value]}
elif isinstance(value, uuid.UUID):
return {' u': value.hex}
elif isinstance(value, bytes):
return {' b': b64encode(value).decode('ascii')}
elif callable(getattr(value, '__html__', None)):
return {' m': text_type(value.__html__())}
elif isinstance(value, list):
return [_tag(x) for x in value]
elif isinstance(value, datetime):
return {' d': http_date(value)}
elif isinstance(value, dict):
return dict((k, _tag(v)) for k, v in iteritems(value))
elif isinstance(value, str):
try:
return text_type(value)
except UnicodeError:
from flask.debughelpers import UnexpectedUnicodeError
raise UnexpectedUnicodeError(u'A byte string with '
u'non-ASCII data was passed to the session system '
u'which can only store unicode strings. Consider '
u'base64 encoding your string (String was %r)' % value)
return value
python类UUID的实例源码
def create_message_header(self) -> Dict:
def _add_correlation_id(brightstide_message_header: Dict, correlation_id: UUID) -> None:
if correlation_id is not None:
brightstide_message_header[message_correlation_id_header] = str(correlation_id)
def _add_message_id(brightside_message_header: Dict, identity: UUID) -> None:
if identity is None:
raise MessagingException("Missing id on message, this is a required field")
brightside_message_header[message_id_header] = str(identity)
def _add_message_type(brightside_message_header: Dict, brightside_message_type: str) -> None:
if brightside_message_type is None:
raise MessagingException("Missing type on message, this is a required field")
brightside_message_header[message_type_header] = brightside_message_type
header = {}
_add_message_id(header, self._message.header.id)
_add_message_type(header, self._message.header.message_type.name)
_add_correlation_id(header, self._message.header.correlation_id)
return header
def test_remove_vms(self):
"""
"""
privileges = self.get_privileges()
default_datastore = self.get_default_datastore()
default_datastore_url = self.get_datastore_url(default_datastore)
vms = [(self.vm1_uuid, self.vm1_name), (self.vm2_uuid, self.vm2_name)]
error_info, tenant1 = self.auth_mgr.create_tenant(name=self.tenant_name,
description='Some tenant',
vms=vms,
privileges=privileges)
self.assertEqual(error_info, None)
self.assertTrue(uuid.UUID(tenant1.id))
error_info = tenant1.remove_vms(self.auth_mgr.conn, vms)
self.assertEqual(error_info, None)
error_info, vms_row = auth.get_row_from_vms_table(self.auth_mgr.conn, tenant1.id)
self.assertEqual(error_info, None)
self.assertEqual(vms_row, [])
def test_set_name(self):
vms = [(self.vm1_uuid, self.vm1_name)]
privileges = self.get_privileges()
default_datastore = self.get_default_datastore()
default_datastore_url = self.get_datastore_url(default_datastore)
error_info, tenant1 = self.auth_mgr.create_tenant(name=self.tenant_name,
description='Some tenant',
vms=vms,
privileges=privileges)
self.assertEqual(error_info, None)
self.assertTrue(uuid.UUID(tenant1.id))
error_info = tenant1.set_name(self.auth_mgr.conn, self.tenant_name, self.tenant_2_name)
self.assertEqual(error_info, None)
error_info, tenants_row = auth.get_row_from_tenants_table(self.auth_mgr.conn, tenant1.id)
self.assertEqual(error_info, None)
expected_output = self.tenant_2_name
actual_output = tenants_row[auth_data_const.COL_NAME]
self.assertEqual(actual_output, expected_output)
def test_set_description(self):
vms = [(self.vm1_uuid, self.vm1_name)]
privileges = self.get_privileges()
error_info, tenant1 = self.auth_mgr.create_tenant(name=self.tenant_name,
description='Some tenant',
vms=vms,
privileges=privileges)
self.assertEqual(error_info, None)
self.assertTrue(uuid.UUID(tenant1.id))
error_info = tenant1.set_description(self.auth_mgr.conn, 'new description')
self.assertEqual(error_info, None)
error_info, tenants_row = auth.get_row_from_tenants_table(self.auth_mgr.conn, tenant1.id)
self.assertEqual(error_info, None)
expected_output = 'new description'
actual_output = tenants_row[auth_data_const.COL_DESCRIPTION]
self.assertEqual(actual_output, expected_output)
def test_set_default_datastore(self):
vms = [(self.vm1_uuid, self.vm1_name)]
privileges = self.get_privileges()
default_datastore = self.get_default_datastore()
default_datastore_url = self.get_datastore_url(default_datastore)
error_info, tenant1 = self.auth_mgr.create_tenant(name=self.tenant_name,
description='Some tenant',
vms=vms,
privileges=privileges)
self.assertEqual(error_info, None)
self.assertTrue(uuid.UUID(tenant1.id))
default_datastore = 'new_default_ds'
default_datastore_url = self.get_datastore_url(default_datastore)
error_info = tenant1.set_default_datastore(self.auth_mgr.conn, default_datastore_url)
self.assertEqual(error_info, None)
# Check tenants table
error_info, tenants_row = auth.get_row_from_tenants_table(self.auth_mgr.conn, tenant1.id)
self.assertEqual(error_info, None)
expected_output = 'new_default_ds_url'
actual_output = tenants_row[auth_data_const.COL_DEFAULT_DATASTORE_URL]
self.assertEqual(actual_output, expected_output)
def get_mac_address():
import uuid
node = uuid.getnode()
mac = uuid.UUID(int=node).hex[-12:]
return mac
def handle(self, player, action, values, **kwargs):
if not action.startswith(self.id):
return
if not self._is_global_shown and player.login not in self._is_player_shown.keys():
# Ignore if id is unique (uuid4)
try:
uuid.UUID(self.id, version=4)
except:
raise ManialinkMemoryLeakException(
'Old view instance (ml-id: {}) is not yet destroyed, but is receiving player callbacks!, '
'Make sure you are not removing old view instances with .destroy() and del variable! '
'Potential Memory Leak!! Should be fixed asap!'.format(self.id)
)
action_name = action[len(self.id)+2:]
if action_name not in self.receivers:
return await self.handle_catch_all(player, action_name, values)
# Call receivers.
for rec in self.receivers[action_name]:
try:
if iscoroutinefunction(rec):
await rec(player, action, values)
else:
rec(player, action, values)
except Exception as e:
if self.throw_exceptions:
raise
else:
logging.exception('Exception has been silenced in ManiaLink Action receiver:', exc_info=e)
def _set_uuid(self, uuid):
if uuid is None:
# UUID generation is expensive. Using FastUUID instead of the built
# in UUID methods increases Messages that can be instantiated per
# second from ~25,000 to ~185,000. Not generating UUIDs at all
# increases the throughput further still to about 730,000 per
# second.
uuid = self._fast_uuid.uuid4()
elif len(uuid) != 16:
raise TypeError(
"UUIDs should be exactly 16 bytes. Conforming UUID's can be "
"generated with `import uuid; uuid.uuid4().bytes`."
)
self._uuid = uuid
def uuid_hex(self):
# TODO: DATAPIPE-848
return UUID(bytes=self.uuid).hex
def run(self):
logger.info(
"Starting to consume from {}".format(self.topic_to_offsets_map)
)
with Consumer(
# The tailer name should be unique - if it's not, partitions will
# be split between multiple tailer instances
'data_pipeline_tailer-{}'.format(
str(UUID(bytes=FastUUID().uuid4()).hex)
),
'bam',
ExpectedFrequency.constantly,
self.topic_to_offsets_map,
auto_offset_reset=self.options.offset_reset_location,
cluster_name=self.options.cluster_name
) as consumer:
message_count = 0
while self.keep_running(message_count):
message = consumer.get_message(blocking=True, timeout=0.1)
if message is not None:
if self.options.end_timestamp is None or message.timestamp < self.options.end_timestamp:
print self._format_message(message)
message_count += 1
else:
self._running = False
logger.info(
"Latest message surpasses --end-timestamp. Stopping tailer..."
)
def convert_uuidfield_value(self, value, expression, connection, context):
# New in Django 1.8
if value is not None:
value = uuid.UUID(value)
return value
def generateGuid():
"""Gets a random GUID.
Note: python's UUID generation library is used here.
Basically UUID is the same as GUID when represented as a string.
:Returns:
str, the generated random GUID.
a=GenerateGuid()
import uuid
print a
print uuid.UUID(a).hex
"""
return str(uuid4())
def arg_uuid(name):
"""Fetch and validate an argument as a UUID"""
argval = request.args.get(name)
if argval is None:
return None
# This will throw a ValueError if something's wrong.
return str(uuid.UUID(argval))
def uuid_is_valid(test_uuid):
"""
Determine if a UUID is valid
"""
try:
uuid_object = uuid.UUID(test_uuid)
except ValueError:
return False
return True
def __new__(cls, obj):
if not isinstance(obj, UUID):
raise TypeError("obj must be an instance of uuid.UUID")
self = Binary.__new__(cls, obj.bytes, OLD_UUID_SUBTYPE)
self.__uuid = obj
return self
def uuid(self):
"""UUID instance wrapped by this UUIDLegacy instance.
"""
return self.__uuid
def _get_binary(data, position, dummy0, opts, dummy1):
"""Decode a BSON binary to bson.binary.Binary or python UUID."""
length, subtype = _UNPACK_LENGTH_SUBTYPE(data[position:position + 5])
position += 5
if subtype == 2:
length2 = _UNPACK_INT(data[position:position + 4])[0]
position += 4
if length2 != length - 4:
raise InvalidBSON("invalid binary (st 2) - lengths don't match!")
length = length2
end = position + length
if subtype in (3, 4):
# Java Legacy
uuid_representation = opts.uuid_representation
if uuid_representation == JAVA_LEGACY:
java = data[position:end]
value = uuid.UUID(bytes=java[0:8][::-1] + java[8:16][::-1])
# C# legacy
elif uuid_representation == CSHARP_LEGACY:
value = uuid.UUID(bytes_le=data[position:end])
# Python
else:
value = uuid.UUID(bytes=data[position:end])
return value, end
# Python3 special case. Decode subtype 0 to 'bytes'.
if PY3 and subtype == 0:
value = data[position:end]
else:
value = Binary(data[position:end], subtype)
return value, end
def test_can_insert_udts_with_all_datatypes(self):
"""
Test for inserting all column types into a UserType
test_can_insert_udts_with_all_datatypes tests that each cqlengine column type can be inserted into a UserType.
It first creates a UserType that has each cqlengine column type, and a corresponding table/Model. It then creates
a UserType instance where all the fields have corresponding data, and inserts the UserType as an instance of the Model.
Finally, it verifies that each column read from the UserType from Cassandra is the same as the input parameters.
@since 2.5.0
@jira_ticket PYTHON-251
@expected_result The UserType is inserted with each column type, and the resulting read yields proper data for each column.
@test_category data_types:udt
"""
sync_table(AllDatatypesModel)
self.addCleanup(drop_table, AllDatatypesModel)
input = AllDatatypes(a='ascii', b=2 ** 63 - 1, c=bytearray(b'hello world'), d=True,
e=datetime.utcfromtimestamp(872835240), f=Decimal('12.3E+7'), g=2.39,
h=3.4028234663852886e+38, i='123.123.123.123', j=2147483647, k='text',
l=UUID('FE2B4360-28C6-11E2-81C1-0800200C9A66'),
m=UUID('067e6162-3b6f-4ae2-a171-2470b63dff00'), n=int(str(2147483647) + '000'))
AllDatatypesModel.create(id=0, data=input)
self.assertEqual(1, AllDatatypesModel.objects.count())
output = AllDatatypesModel.objects.first().data
for i in range(ord('a'), ord('a') + 14):
self.assertEqual(input[chr(i)], output[chr(i)])
def test_nested_udts_inserts(self):
"""
Test for inserting collections of user types using cql engine.
test_nested_udts_inserts Constructs a model that contains a list of usertypes. It will then attempt to insert
them. The expectation is that no exception is thrown during insert. For sanity sake we also validate that our
input and output values match. This combination of model, and UT produces a syntax error in 2.5.1 due to
improper quoting around the names collection.
@since 2.6.0
@jira_ticket PYTHON-311
@expected_result No syntax exception thrown
@test_category data_types:udt
"""
class Name(UserType):
type_name__ = "header"
name = columns.Text()
value = columns.Text()
class Container(Model):
id = columns.UUID(primary_key=True, default=uuid4)
names = columns.List(columns.UserDefinedType(Name))
# Construct the objects and insert them
names = []
for i in range(0, 10):
names.append(Name(name="name{0}".format(i), value="value{0}".format(i)))
# Create table, insert data
sync_table(Container)
self.addCleanup(drop_table, Container)
Container.create(id=UUID('FE2B4360-28C6-11E2-81C1-0800200C9A66'), names=names)
# Validate input and output matches
self.assertEqual(1, Container.objects.count())
names_output = Container.objects.first().names
self.assertEqual(names_output, names)