def print_header(o):
formated_out = """Version:\t%(version)s
Creation Date:\t%(timestamp)s
Incremental:\t%(incremental)s
Volume Size:\t%(volume_size)s
Segment Size:\t%(segment_size)s
Backup Set:\t%(backup_set)s"""
dict_out = dict()
dict_out['backup_set'] = UUID(bytes=o.metadata['bases'][-1])
dict_out['version'] = o.metadata['version']
dict_out['incremental'] = o.metadata['incremental']
dict_out['timestamp'] = get_timestamp(o)
dict_out['volume_size'] = convert_size(o.metadata['sectors'] * 512)
dict_out['segment_size'] = convert_size(o.metadata['segment_size'])
print(formated_out % dict_out)
python类UUID的实例源码
def put_data(self, data_segment):
segment = data_segment[1]
data = self.wrap_data(data_segment[0], segment)
file_path = os.path.join(
self.storage_location,
str(uuid.UUID(bytes=segment.backupset_id)),
str(segment.incremental)
)
mkpath(file_path)
file_output = os.path.join(
file_path,
str(segment.segment)
)
with open(file_output, 'wb') as f:
f.write(data)
return segment
def api_is_task(url):
"""Determine if a URL looks like a valid task URL"""
# Note that this generates an extra array element because of the
# leading slash.
url_parts = urlparse.urlparse(url).path.split('/')
if len(url_parts) != 4 \
or (url_parts[:3] != ['', 'pscheduler', 'tasks' ]):
return False
try:
uuid.UUID(url_parts[3])
except ValueError:
return False
return True
def api_is_run(url):
"""Determine if a URL looks like a valid run URL"""
# Note that this generates an extra array element because of the
# leading slash.
url_parts = urlparse.urlparse(url).path.split('/')
if len(url_parts) != 6 \
or (url_parts[:3] != ['', 'pscheduler', 'tasks' ]) \
or (url_parts[4] != 'runs'):
return False
try:
uuid.UUID(url_parts[3])
uuid.UUID(url_parts[5])
except ValueError:
return False
return True
def _encode_uuid(name, value, dummy, opts):
"""Encode uuid.UUID."""
uuid_representation = opts.uuid_representation
# Python Legacy Common Case
if uuid_representation == OLD_UUID_SUBTYPE:
return b"\x05" + name + b'\x10\x00\x00\x00\x03' + value.bytes
# Java Legacy
elif uuid_representation == JAVA_LEGACY:
from_uuid = value.bytes
data = from_uuid[0:8][::-1] + from_uuid[8:16][::-1]
return b"\x05" + name + b'\x10\x00\x00\x00\x03' + data
# C# legacy
elif uuid_representation == CSHARP_LEGACY:
# Microsoft GUID representation.
return b"\x05" + name + b'\x10\x00\x00\x00\x03' + value.bytes_le
# New
else:
return b"\x05" + name + b'\x10\x00\x00\x00\x04' + value.bytes
def default(self, o):
"""Implement this method in a subclass such that it returns a
serializable object for ``o``, or calls the base implementation (to
raise a ``TypeError``).
For example, to support arbitrary iterators, you could implement
default like this::
def default(self, o):
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
return JSONEncoder.default(self, o)
"""
if isinstance(o, datetime):
return http_date(o)
if isinstance(o, uuid.UUID):
return str(o)
if hasattr(o, '__html__'):
return text_type(o.__html__())
return _json.JSONEncoder.default(self, o)
def loads(self, value):
def object_hook(obj):
if len(obj) != 1:
return obj
the_key, the_value = next(iteritems(obj))
if the_key == ' t':
return tuple(the_value)
elif the_key == ' u':
return uuid.UUID(the_value)
elif the_key == ' b':
return b64decode(the_value)
elif the_key == ' m':
return Markup(the_value)
elif the_key == ' d':
return parse_date(the_value)
return obj
return json.loads(value, object_hook=object_hook)
def detach_volume(self, blockdevice_id):
"""
:param: volume id = blockdevice_id
:raises: unknownvolume exception if not found
"""
try:
dataset_id = UUID(blockdevice_id[6:])
except ValueError:
raise UnknownVolume(blockdevice_id)
volumesdetails = self.coprhdcli.get_volume_details("flocker-{}".format(dataset_id))
if not volumesdetails:
raise UnknownVolume(blockdevice_id)
if volumesdetails[volumesdetails.keys()[0]]['attached_to'] is not None:
Message.new(Info="coprhd detach_volume" + str(blockdevice_id)).write(_logger)
dataset_id = UUID(blockdevice_id[6:])
self.coprhdcli.unexport_volume("flocker-{}".format(dataset_id))
else:
Message.new(Info="Volume" + blockdevice_id + "not attached").write(_logger)
raise UnattachedVolume(blockdevice_id)
def default(self, o):
"""Implement this method in a subclass such that it returns a
serializable object for ``o``, or calls the base implementation (to
raise a :exc:`TypeError`).
For example, to support arbitrary iterators, you could implement
default like this::
def default(self, o):
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
return JSONEncoder.default(self, o)
"""
if isinstance(o, date):
return http_date(o.timetuple())
if isinstance(o, uuid.UUID):
return str(o)
if hasattr(o, '__html__'):
return text_type(o.__html__())
return _json.JSONEncoder.default(self, o)
def _tag(value):
if isinstance(value, tuple):
return {' t': [_tag(x) for x in value]}
elif isinstance(value, uuid.UUID):
return {' u': value.hex}
elif isinstance(value, bytes):
return {' b': b64encode(value).decode('ascii')}
elif callable(getattr(value, '__html__', None)):
return {' m': text_type(value.__html__())}
elif isinstance(value, list):
return [_tag(x) for x in value]
elif isinstance(value, datetime):
return {' d': http_date(value)}
elif isinstance(value, dict):
return dict((k, _tag(v)) for k, v in iteritems(value))
elif isinstance(value, str):
try:
return text_type(value)
except UnicodeError:
from flask.debughelpers import UnexpectedUnicodeError
raise UnexpectedUnicodeError(u'A byte string with '
u'non-ASCII data was passed to the session system '
u'which can only store unicode strings. Consider '
u'base64 encoding your string (String was %r)' % value)
return value
def validate(self, value):
if value is None:
return True
if isinstance(value, UUID):
return True
if isinstance(value, six.string_types):
try:
UUID(value)
return True
except TypeError:
pass
except ValueError:
pass
return False
def default(self, o,
dates=(datetime.datetime, datetime.date),
times=(datetime.time,),
textual=(decimal.Decimal, uuid.UUID, DjangoPromise),
isinstance=isinstance,
datetime=datetime.datetime,
text_type=text_type):
if isinstance(o, dates):
if not isinstance(o, datetime):
o = datetime(o.year, o.month, o.day, 0, 0, 0, 0)
r = o.isoformat()
if r.endswith("+00:00"):
r = r[:-6] + "Z"
return r
elif isinstance(o, times):
return o.isoformat()
elif isinstance(o, textual):
return text_type(o)
else:
return super(JsonEncoder, self).default(o)
def __new__(cls, uuid, aik_path, vtpm, added=False):
"""
Args
----
uuid: str
The UUID for this VTPM Group
aik_path: str
The path to the .pem for this group's AIK
"""
if uuid in VTPMGroup._groups:
return VTPMGroup._groups[uuid]
obj = object.__new__(cls)
obj.uuid = uuid
obj.aik_path = aik_path
obj.vtpm = vtpm
if added:
VTPMGroup._groups[uuid] = obj
return obj
def show_group(group_num):
""" Returns info about group `group_num` using VTPM_ORD_GROUP_SHOW"""
out = {'num': group_num, 'vtpms': []}
body = vtpm_raw(0x1C2, struct.pack('>II', 0x02000107, group_num))
(uuid, pk, cfg) = struct.unpack('16s 256s 16s', body)
uuid = stringify_uuid(uuid)
logger.info('Group [%d] UUID: %s', group_num, uuid)
pk_hash = hashlib.sha1(pk).hexdigest()
logger.info(' PK Hash: %s', pk_hash)
logger.info(' Cfg list: %s', cfg.encode('hex'))
body = vtpm_cmd(VTPM_ORD_VTPM_LIST, struct.pack('>II', group_num, 0))
((num_vtpms,), body) = unpack('>I', body)
if num_vtpms > 0:
logger.info(' vTPMs: ')
vtpms = struct.unpack('16s' * num_vtpms, body)
vtpms = [stringify_uuid(vtpm) for vtpm in vtpms]
for i, vtpm in enumerate(vtpms):
logger.info(' [%d]: %s', i, vtpm)
out['vtpms'].append(vtpm)
out['uuid'] = uuid
return out
def get_group_info(num):
"""Returns UUID and path to the group AIK file for vtpm group `num`."""
# Get info for group `num`
ginfo = show_group(num)
uuid = ginfo['uuid']
aikname = '{0}_aik'.format(uuid)
pubaik_path = '{0}.pub'.format(aikname)
# Check that we have the group's AIK
if not os.path.exists(pubaik_path):
logger.error('Group %d AIK Path %r doesn\'t exist', num, pubaik_path)
raise OSError()
aikbase = '{0}'.format(aikname)
aikpem = aikbase + '.pem'
# Convert group AIK to PEM
check_call('tpmconv -ik {0} -ok {1}'.format(pubaik_path, aikbase),
shell=True)
return {'aikpem': aikpem, 'uuid': uuid}
def add_vtpm_group(rsa_mod=None):
""" Add new vtpm group"""
if common.STUB_TPM:
return (common.TEST_GROUP_UUID,common.TEST_HAIK,1,None)
logger.debug('Adding group')
if rsa_mod is None:
rsa_mod = '\x00' * 256
assert len(rsa_mod) == 256
ca_digest = '\x00' * 20
rsp = vtpm_cmd(VTPM_ORD_GROUP_NEW, ca_digest + rsa_mod)
(uuid, aik_pub, aik_priv_ca) = struct.unpack('16s256s256s', rsp)
uuid = struct.unpack(uuid_fmt, uuid)
uuid = '-'.join([part.encode('hex') for part in uuid])
logger.info('Created group with UUID: %s', uuid)
aikpem = tpmconv(aik_pub)
# return the group
group_num = get_group_num(uuid)
return (uuid,aikpem,group_num,aik_priv_ca)
def encodeSmallAttribute(self, attr):
"""
@since: 0.5
"""
obj = getattr(self, attr)
if not obj:
return obj
if attr in ['timestamp', 'timeToLive']:
return pyamf.util.get_timestamp(obj) * 1000.0
elif attr in ['clientId', 'messageId']:
if isinstance(obj, uuid.UUID):
return None
return obj
def testUUIDARRAY(self):
import uuid
psycopg2.extras.register_uuid()
u = [uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350'),
uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e352')]
s = self.execute("SELECT %s AS foo", (u,))
self.assertTrue(u == s)
# array with a NULL element
u = [uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350'), None]
s = self.execute("SELECT %s AS foo", (u,))
self.assertTrue(u == s)
# must survive NULL cast to a uuid[]
s = self.execute("SELECT NULL::uuid[] AS foo")
self.assertTrue(s is None)
# what about empty arrays?
s = self.execute("SELECT '{}'::uuid[] AS foo")
self.assertTrue(type(s) == list and len(s) == 0)
def default(self, o):
"""Implement this method in a subclass such that it returns a
serializable object for ``o``, or calls the base implementation (to
raise a ``TypeError``).
For example, to support arbitrary iterators, you could implement
default like this::
def default(self, o):
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
return JSONEncoder.default(self, o)
"""
if isinstance(o, datetime):
return http_date(o)
if isinstance(o, uuid.UUID):
return str(o)
if hasattr(o, '__html__'):
return text_type(o.__html__())
return _json.JSONEncoder.default(self, o)
def loads(self, value):
def object_hook(obj):
if len(obj) != 1:
return obj
the_key, the_value = next(iteritems(obj))
if the_key == ' t':
return tuple(the_value)
elif the_key == ' u':
return uuid.UUID(the_value)
elif the_key == ' b':
return b64decode(the_value)
elif the_key == ' m':
return Markup(the_value)
elif the_key == ' d':
return parse_date(the_value)
return obj
return json.loads(value, object_hook=object_hook)
def test_send(header_timestamp_mock, pubsub_client_mock):
messaging = queue_messaging.Messaging.create_from_dict({
'TOPIC': 'test-topic',
})
model = FancyEvent(
uuid_field=uuid.UUID('cd1d3a03-7b04-4a35-97f8-ee5f3eb04c8e'),
string_field='Just testing!'
)
header_timestamp_mock.return_value = datetime.datetime(
2016, 12, 10, 11, 15, 45, 123456, tzinfo=datetime.timezone.utc)
messaging.send(model)
topic_mock = pubsub_client_mock.return_value.topic
publish_mock = topic_mock.return_value.publish
topic_mock.assert_called_with('test-topic')
publish_mock.assert_called_with(
test_utils.EncodedJson({
"uuid_field": "cd1d3a03-7b04-4a35-97f8-ee5f3eb04c8e",
"string_field": "Just testing!"
}),
timestamp='2016-12-10T11:15:45.123456Z',
type='FancyEvent'
)
def test_if_encode_raises_exception_with_invalid_data_and_strict_schema():
class StrictSchema(marshmallow.Schema):
uuid_field = fields.UUID(required=True)
class Meta:
strict = True
class Event(structures.Model):
class Meta:
schema = StrictSchema
type_name = 'Event'
data = Event(uuid_field='not an uuid')
with pytest.raises(exceptions.EncodingError) as excinfo:
encoding.encode(data)
assert str(excinfo.value) == (
"({'uuid_field': ['Not a valid UUID.']}, '')")
def generate_uuid(cls, return_hex=False, seed=None):
"""
Generate uuid
:param return_hex: Return in hex format
:param seed: Seed value to generate a consistent uuid
:return:
"""
if seed:
m = hashlib.md5()
m.update(seed.encode('utf-8'))
new_uuid = uuid.UUID(m.hexdigest())
else:
new_uuid = uuid.uuid1()
if return_hex:
return new_uuid.hex
return str(new_uuid)
def json_serial(obj):
"""
JSON serializer for objects not serializable by default json code.
:param obj: object to serialize
:type obj: date, datetime or UUID
:return: formatted and serialized object
:rtype: str
"""
if isinstance(obj, datetime.datetime) or isinstance(obj, datetime.date):
# Datetime serializer
return obj.isoformat()
elif isinstance(obj, uuid.UUID):
return str(obj)
raise TypeError("Type %s not serializable" % type(obj))
def test_uid(self):
class UidModel(properties.HasProperties):
uid = properties.Uuid('my uuid')
model = UidModel()
assert isinstance(model.uid, uuid.UUID)
with self.assertRaises(AttributeError):
model.uid = uuid.uuid4()
assert model.validate()
model._backend['uid'] = 'hi'
with self.assertRaises(ValueError):
model.validate()
json_uuid = uuid.uuid4()
json_uuid_str = str(json_uuid)
assert properties.Uuid.to_json(json_uuid) == json_uuid_str
assert str(properties.Uuid.from_json(json_uuid_str)) == json_uuid_str
assert properties.Uuid('').equal(uuid.UUID(int=0), uuid.UUID(int=0))
def bind_port(self, uuid, model, changes):
"""Called to bind port to VM.
:param uuid: UUID of Port
:param model: Model object
:returns: dict of vif parameters (vif_type, vif_details)
"""
LOG.info("bind_port: %s" % uuid)
LOG.info(changes)
port = model.ports.get(uuid, None)
if not port:
LOG.error("Cannot find port")
return dict()
retval = {'vif_type': 'ovs',
'vif_details': {'port_filter': False,
'bridge_name': 'br-int'}}
return retval
def modify_service(self, uuid, model, changes):
"""Called when attributes change on a bound port's service
:param uuid: UUID of Service
:param model: Model Object
:param changes: dictionary of changed attributes
:returns: None
"""
LOG.info("modify_service: %s" % uuid)
LOG.info(changes)
LOG.info("Creating or updating VPN instance")
vpn_instance = model.vpn_instances.get(uuid)
if vpn_instance:
self._create_or_update_service(vpn_instance)
else:
LOG.error("VPN instance %s not found" % uuid)
def default(self, o):
"""Implement this method in a subclass such that it returns a
serializable object for ``o``, or calls the base implementation (to
raise a :exc:`TypeError`).
For example, to support arbitrary iterators, you could implement
default like this::
def default(self, o):
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
return JSONEncoder.default(self, o)
"""
if isinstance(o, date):
return http_date(o.timetuple())
if isinstance(o, uuid.UUID):
return str(o)
if hasattr(o, '__html__'):
return text_type(o.__html__())
return _json.JSONEncoder.default(self, o)
def _tag(value):
if isinstance(value, tuple):
return {' t': [_tag(x) for x in value]}
elif isinstance(value, uuid.UUID):
return {' u': value.hex}
elif isinstance(value, bytes):
return {' b': b64encode(value).decode('ascii')}
elif callable(getattr(value, '__html__', None)):
return {' m': text_type(value.__html__())}
elif isinstance(value, list):
return [_tag(x) for x in value]
elif isinstance(value, datetime):
return {' d': http_date(value)}
elif isinstance(value, dict):
return dict((k, _tag(v)) for k, v in iteritems(value))
elif isinstance(value, str):
try:
return text_type(value)
except UnicodeError:
from flask.debughelpers import UnexpectedUnicodeError
raise UnexpectedUnicodeError(u'A byte string with '
u'non-ASCII data was passed to the session system '
u'which can only store unicode strings. Consider '
u'base64 encoding your string (String was %r)' % value)
return value
def default(self, o):
"""Implement this method in a subclass such that it returns a
serializable object for ``o``, or calls the base implementation (to
raise a :exc:`TypeError`).
For example, to support arbitrary iterators, you could implement
default like this::
def default(self, o):
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
return JSONEncoder.default(self, o)
"""
if isinstance(o, date):
return http_date(o.timetuple())
if isinstance(o, uuid.UUID):
return str(o)
if hasattr(o, '__html__'):
return text_type(o.__html__())
return _json.JSONEncoder.default(self, o)