def id_from_name(name):
"""Generate a UUID using a name as the namespace
:type name: str
:rtype: str
"""
return str(uuid.uuid3(uuid.NAMESPACE_DNS, name)).upper()
python类uuid3()的实例源码
def create_uuid(self):
if not self.version or self.version == 4:
return uuid.uuid4()
elif self.version == 1:
return uuid.uuid1(self.node, self.clock_seq)
elif self.version == 2:
raise UUIDVersionError("UUID version 2 is not supported.")
elif self.version == 3:
return uuid.uuid3(self.namespace, self.name)
elif self.version == 5:
return uuid.uuid5(self.namespace, self.name)
else:
raise UUIDVersionError("UUID version %s is not valid." % self.version)
def test_create_timeuuid_with_uuid4_string_should_fail(self):
''' creating a TimeUUID with a hex uuid4 should fail'''
for i in range(1,100):
u = uuid.uuid4()
with self.assertRaises(ValueError) as cm:
t = timeuuid.TimeUUID(s=u.hex)
self.assertEqual(str(cm.exception), 'Invalid UUID type')
for fn in [uuid.uuid3, uuid.uuid5]:
for i in range(1,100):
u = fn(uuid.NAMESPACE_DNS,str(os.urandom(10)))
with self.assertRaises(ValueError) as cm:
t = timeuuid.TimeUUID(s=u.hex)
self.assertEqual(str(cm.exception), 'Invalid UUID type')
def generate(authorization_id):
"""
Generate an access token based on an (unexpired) authorization id.
"""
auth = authorizationcode.Authorizationcode.get('id=%s', authorization_id)
consumer_key = uuid.uuid3(uuid.NAMESPACE_DNS, base36encode(auth.id) + '-' + base36encode(auth.app_id))
consumer_secret = sha224("%s%s" % (str(uuid.uuid1()), time.time())).hexdigest()
if auth.expires_at > datetime.utcnow():
access_token = Accesstoken(user_id=auth.user_id, app_id=auth.app_id, consumer_key=str(consumer_key), consumer_secret=str(consumer_secret))
access_token.save()
return access_token
else:
return None
def uuid(self, name):
"""Get an ID"""
if not name:
name = self.def_name
try:
return str(uuid.uuid3(uuid.uuid4(), str(name) + str(time.time())))
except (NameError, AttributeError):
return(("%s-%s-%s-%s-%s") %
(self._rnd(8), self._rnd(4), self._rnd(4),
self._rnd(4), self._rnd(12)))
def filename(self, filename):
"""Get a filename"""
prefix = self.def_name + "-" + str(os.getpid()) + "-"
try:
return(prefix +
str(uuid.uuid3(uuid.uuid4(), str(time.time()))) +
"-" + str(filename))
except (NameError, AttributeError):
return prefix + self.uuid(filename) + "-" + str(filename)
def uuid_for_string(s):
return uuid.uuid3(uuid.NAMESPACE_OID, s)
def get_mount_point(block):
global collection
global messages
bus = dbus.SystemBus()
obj = bus.get_object('org.freedesktop.UDisks2', block)
iface = dbus.Interface(obj, 'org.freedesktop.DBus.Properties') # Here we use this 'magic' interface
dbus_mount_point = iface.Get('org.freedesktop.UDisks2.Filesystem', 'MountPoints')
mount_point = ''
while not len(dbus_mount_point):
time.sleep(0.5)
dbus_mount_point = iface.Get('org.freedesktop.UDisks2.Filesystem', 'MountPoints')
dbus_id = iface.Get('org.freedesktop.UDisks2.Block', 'Id')
dbus_name = iface.Get('org.freedesktop.UDisks2.Block', 'IdLabel')
dbus_space = iface.Get('org.freedesktop.UDisks2.Block', 'Size')
for letter in dbus_mount_point[0]:
mount_point += chr(letter)
if not dbus_name:
dbus_name = mount_point[:-1].split(os.sep)
dbus_name = dbus_name[len(dbus_name) - 1]
if not dbus_id:
dbus_id = uuid.uuid3(uuid.uuid4(), dbus_name)
collection[block] = [str(mount_point[:-1]), str(dbus_id), str(dbus_name), None, None, dbus_space]
messages.append(
'You have a new device connected (' + dbus_name + ', ' + extra_functions.convert_to_human_readable(
dbus_space) + '). To have JF track it, execute:' + '\n' + ' jf ' + '-i ' + str(mount_point[:-1]))
return dbus_id, block, dbus_name
def attachment_marker(raw_stream_id: uuid, stream_name: str, owner_id: uuid, dd_stream_name, CC: CerebralCortex,
config: dict):
"""
Label sensor data as sensor-on-body, sensor-off-body, or improper-attachment.
All the labeled data (st, et, label) with its metadata are then stored in a datastore
"""
# TODO: quality streams could be multiple so find the one computed with CC
# using stream_id, data-diagnostic-stream-id, and owner id to generate a unique stream ID for battery-marker
attachment_marker_stream_id = uuid.uuid3(uuid.NAMESPACE_DNS, str(raw_stream_id + dd_stream_name + owner_id))
stream_days = get_stream_days(raw_stream_id, attachment_marker_stream_id, CC)
for day in stream_days:
# load stream data to be diagnosed
raw_stream = CC.get_datastream(raw_stream_id, day, data_type=DataSet.COMPLETE)
if len(raw_stream.data) > 0:
windowed_data = window(raw_stream.data, config['general']['window_size'], True)
results = process_windows(windowed_data, config)
merged_windows = merge_consective_windows(results)
input_streams = [{"owner_id": owner_id, "id": str(raw_stream_id), "name": stream_name}]
output_stream = {"id": attachment_marker_stream_id, "name": dd_stream_name,
"algo_type": config["algo_type"]["attachment_marker"]}
metadata = get_metadata(dd_stream_name, input_streams, config)
store(merged_windows, input_streams, output_stream, metadata, CC, config)
def phone_screen_touch_marker(raw_stream_id: uuid, raw_stream_name: str, owner_id, dd_stream_name, CC: CerebralCortex,
config: dict, start_time=None, end_time=None):
"""
This is not part of core data diagnostic suite.
It only calculates how many screen touches are there.
:param raw_stream_id:
:param CC:
:param config:
"""
try:
# using stream_id, data-diagnostic-stream-id, and owner id to generate a unique stream ID for battery-marker
screen_touch_stream_id = uuid.uuid3(uuid.NAMESPACE_DNS, str(
raw_stream_id + dd_stream_name + owner_id + "mobile phone screen touch marker"))
stream_days = get_stream_days(raw_stream_id, screen_touch_stream_id, CC)
for day in stream_days:
stream = CC.get_datastream(raw_stream_id, data_type=DataSet.COMPLETE, day=day, start_time=start_time,
end_time=end_time)
if len(stream.data) > 0:
windowed_data = window(stream.data, config['general']['window_size'], True)
results = process_windows(windowed_data)
merged_windows = merge_consective_windows(results)
if len(merged_windows) > 0:
input_streams = [{"owner_id": owner_id, "id": str(raw_stream_id), "name": raw_stream_name}]
output_stream = {"id": screen_touch_stream_id, "name": dd_stream_name,
"algo_type": config["algo_type"]["app_availability_marker"]}
metadata = get_metadata(dd_stream_name, input_streams, config)
store(merged_windows, input_streams, output_stream, metadata, CC, config)
except Exception as e:
print(e)
def battery_marker(raw_stream_id: uuid, stream_name: str, owner_id, dd_stream_name, CC: CerebralCortex, config: dict,
start_time=None, end_time=None):
"""
This algorithm uses battery percentages to decide whether device was powered-off or battery was low.
All the labeled data (st, et, label) with its metadata are then stored in a datastore.
:param raw_stream_id:
:param CC:
:param config:
"""
try:
# using stream_id, data-diagnostic-stream-id, and owner id to generate a unique stream ID for battery-marker
battery_marker_stream_id = uuid.uuid3(uuid.NAMESPACE_DNS, str(raw_stream_id + dd_stream_name + owner_id))
stream_days = get_stream_days(raw_stream_id, battery_marker_stream_id, CC)
for day in stream_days:
stream = CC.get_datastream(raw_stream_id, data_type=DataSet.COMPLETE, day=day)
if len(stream.data) > 0:
windowed_data = window(stream.data, config['general']['window_size'], True)
results = process_windows(windowed_data, stream_name, config)
merged_windows = merge_consective_windows(results)
if len(merged_windows) > 0:
input_streams = [{"owner_id": owner_id, "id": str(raw_stream_id), "name": stream_name}]
output_stream = {"id": battery_marker_stream_id, "name": dd_stream_name,
"algo_type": config["algo_type"]["battery_marker"]}
labelled_windows = mark_windows(battery_marker_stream_id, merged_windows, CC, config)
metadata = get_metadata(dd_stream_name, input_streams, config)
store(labelled_windows, input_streams, output_stream, metadata, CC, config)
except Exception as e:
print(e)
def sensor_availability(raw_stream_id: uuid, stream_name: str, owner_id: uuid, dd_stream_name,
phone_physical_activity, CC: CerebralCortex, config: dict):
"""
Mark missing data as wireless disconnection if a participate walks away from phone or sensor
:param raw_stream_id:
:param stream_name:
:param owner_id:
:param dd_stream_name:
:param phone_physical_activity:
:param CC:
:param config:
"""
# using stream_id, data-diagnostic-stream-id, and owner id to generate a unique stream ID for battery-marker
wireless_marker_stream_id = uuid.uuid3(uuid.NAMESPACE_DNS, str(raw_stream_id + dd_stream_name + owner_id))
stream_days = get_stream_days(raw_stream_id, wireless_marker_stream_id, CC)
for day in stream_days:
# load stream data to be diagnosed
raw_stream = CC.get_datastream(raw_stream_id, day, data_type=DataSet.COMPLETE)
if len(raw_stream.data) > 0:
windowed_data = window(raw_stream.data, config['general']['window_size'], True)
results = process_windows(windowed_data, day, CC, phone_physical_activity, config)
merged_windows = merge_consective_windows(results)
if len(merged_windows) > 0:
input_streams = [{"owner_id": owner_id, "id": str(raw_stream_id), "name": stream_name}]
output_stream = {"id": wireless_marker_stream_id, "name": dd_stream_name,
"algo_type": config["algo_type"]["sensor_unavailable_marker"]}
metadata = get_metadata(dd_stream_name, input_streams, config)
store(merged_windows, input_streams, output_stream, metadata, CC, config)
def analyze_quality(streams, owner_id, led_right_wrist_quality_stream_name, wrist, CC):
led_stream_quality_id = uuid.uuid3(uuid.NAMESPACE_DNS, str(led_right_wrist_quality_stream_name + owner_id+"LED quality computed on CerebralCortex"))
if wrist=="right":
if "LED--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST" in streams:
led_wrist_stream_id = streams["LED--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST"][
"identifier"]
led_wrist_stream_name = streams["LED--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST"]["name"]
else:
led_wrist_stream_id = None
else:
if "LED--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST" in streams:
led_wrist_stream_id = streams["LED--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST"][
"identifier"]
led_wrist_stream_name = streams["LED--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST"]["name"]
else:
led_wrist_stream_id = None
if led_wrist_stream_id:
stream_end_days = CC.get_stream_start_end_time(led_wrist_stream_id)
if stream_end_days["start_time"] and stream_end_days["end_time"]:
days = stream_end_days["end_time"] - stream_end_days["start_time"]
for day in range(days.days + 1):
day = (stream_end_days["start_time"]+timedelta(days=day)).strftime('%Y%m%d')
stream = CC.get_datastream(led_wrist_stream_id, data_type=DataSet.COMPLETE, day=day)
if len(stream.data) > 0:
windowed_data = window(stream.data, 3, False)
led_quality_windows = data_quality_led(windowed_data)
input_streams = [{"owner_id": str(owner_id), "id": str(led_wrist_stream_id),
"name": led_wrist_stream_name}]
output_stream = {"id": str(led_stream_quality_id), "name": led_right_wrist_quality_stream_name, "algo_type": ""}
store(led_quality_windows, input_streams, output_stream, CC)
def migrate(folder_path: str, data_block_size):
"""
Migrate data from old CerebralCortex structure to new CerebralCortex structure
:param folder_path:
"""
configuration_file = os.path.join(os.path.dirname(__file__), '../../cerebralcortex.yml')
CC = CerebralCortex(configuration_file, master="local[*]", name="Data Migrator API", time_zone="US/Central", load_spark=True)
if not folder_path:
raise ValueError("Path to the data directory cannot be empty.")
for filename in glob.iglob(folder_path + '/**/*.json', recursive=True):
print(str(datetime.datetime.now()) + " -- Started processing file " + filename)
tmp = filename.split("/")
tmp = tmp[len(tmp) - 1].split("+")
owner_id = tmp[0]
stream_id = str(uuid.uuid3(uuid.NAMESPACE_DNS, str(tmp[0] + " " + tmp[1])))
name = ''
for i in tmp[3:]:
name += i + " "
name = name.strip().replace(".json", "")
name = tmp[1] + " " + name
pm_algo_name = tmp[2]
data_filename = filename.replace(".json", ".csv.bz2")
old_schema = read_file(filename)
execution_context = get_execution_context(pm_algo_name, old_schema)
data_descriptor = get_data_descriptor(old_schema)
annotations = get_annotations()
print(str(datetime.datetime.now()) + " -- Schema building is complete ")
print(str(datetime.datetime.now()) + " -- Started unzipping file and adding records in Cassandra ")
for data_block in bz2file_to_datapoints(data_filename, data_block_size):
persist_data(execution_context, data_descriptor, annotations, stream_id, name, owner_id, data_block, CC)
print(str(datetime.datetime.now()) + " -- Completed processing file " + filename)
def generate_token(username):
"""generate a short token based on given username"""
username = str(username)
token = str(uuid.uuid3(uuid.NAMESPACE_URL, username))[:6]
return token
def cluster_config(self):
"""
Provide the default configuration for a cluster
"""
if self.cluster:
cluster_dir = "{}/config/stack/default/{}".format(self.root_dir, self.cluster)
if not os.path.isdir(cluster_dir):
create_dirs(cluster_dir, self.root_dir)
filename = "{}/cluster.yml".format(cluster_dir)
contents = {}
contents['fsid'] = str(uuid.uuid3(uuid.NAMESPACE_DNS, self.keyring_roles['admin']))
contents['admin_method'] = "default"
contents['configuration_method'] = "default"
contents['mds_method'] = "default"
contents['mon_method'] = "default"
contents['osd_method'] = "default"
contents['package_method'] = "default"
contents['pool_method'] = "default"
contents['repo_method'] = "default"
contents['rgw_method'] = "default"
contents['update_method'] = "default"
contents['public_network'] = self.public_network
contents['cluster_network'] = self.cluster_network
self.writer.write(filename, contents)
def get_host_info():
""" Returns an object with unique information about the host """
obj = {}
# uuid.getnode() can return a random number, we need to fix it
obj['uuid'] = str(uuid.uuid3(uuid.NAMESPACE_DNS, str(uuid.getnode())))
obj['host_name'] = socket.gethostname()
obj['operative_system'] = '{0}-{1}'.format(
platform.system(),
platform.release()
)
return obj
def _fabric_server_uuid(host, port):
"""Create a UUID using host and port"""
return uuid.uuid3(uuid.NAMESPACE_URL, _fabric_xmlrpc_uri(host, port))
def recognize(self, path, lang="zh-CN"):
if isinstance(path, str):
file = open(path, 'rb')
else:
return ["ERROR!", "File must by a path string."]
if lang not in self.lang_list:
return ["ERROR!", "Invalid language."]
audio = pydub.AudioSegment.from_file(file)
audio = audio.set_frame_rate(16000)
audio.export("%s.wav" % path, format="wav")
header = {
"Authorization": "Bearer %s" % self.access_token,
"Content-Type": "audio/wav; samplerate=16000"
}
d = {
"version": "3.0",
"requestid": str(uuid.uuid1()),
"appID": "D4D52672-91D7-4C74-8AD8-42B1D98141A5",
"format": "json",
"locale": lang,
"device.os": "Telegram",
"scenarios": "ulm",
"instanceid": uuid.uuid3(uuid.NAMESPACE_DNS, 'com.1a23.eh_telegram_master'),
"maxnbest": 5
}
with open("%s.wav" % path, 'rb') as f:
r = requests.post("https://speech.platform.bing.com/recognize", params=d, data=f.read(), headers=header)
os.remove("%s.wav" % path)
try:
rjson = r.json()
except:
return ["ERROR!", r.text]
if r.status_code == 200:
return [i['name'] for i in rjson['results']]
else:
return ["ERROR!", r.text]
def type_coerce(expr, type_):
"""Coerce the given expression into the given type,
on the Python side only.
:func:`.type_coerce` is roughly similar to :func:`.cast`, except no
"CAST" expression is rendered - the given type is only applied towards
expression typing and against received result values.
e.g.::
from sqlalchemy.types import TypeDecorator
import uuid
class AsGuid(TypeDecorator):
impl = String
def process_bind_param(self, value, dialect):
if value is not None:
return str(value)
else:
return None
def process_result_value(self, value, dialect):
if value is not None:
return uuid.UUID(value)
else:
return None
conn.execute(
select([type_coerce(mytable.c.ident, AsGuid)]).\\
where(
type_coerce(mytable.c.ident, AsGuid) ==
uuid.uuid3(uuid.NAMESPACE_URL, 'bar')
)
)
"""
type_ = sqltypes.to_instance(type_)
if hasattr(expr, '__clause_element__'):
return type_coerce(expr.__clause_element__(), type_)
elif isinstance(expr, BindParameter):
bp = expr._clone()
bp.type = type_
return bp
elif not isinstance(expr, Visitable):
if expr is None:
return null()
else:
return literal(expr, type_=type_)
else:
return Label(None, expr, type_=type_)