python类uuid3()的实例源码

utilities.py 文件源码 项目:blender_addons_collection 作者: manorius 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def id_from_name(name):
    """Generate a UUID using a name as the namespace

    :type name: str
    :rtype: str

    """
    return str(uuid.uuid3(uuid.NAMESPACE_DNS, name)).upper()
tools_for_tests.py 文件源码 项目:gougo 作者: amaozhao 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def create_uuid(self):
        if not self.version or self.version == 4:
            return uuid.uuid4()
        elif self.version == 1:
            return uuid.uuid1(self.node, self.clock_seq)
        elif self.version == 2:
            raise UUIDVersionError("UUID version 2 is not supported.")
        elif self.version == 3:
            return uuid.uuid3(self.namespace, self.name)
        elif self.version == 5:
            return uuid.uuid5(self.namespace, self.name)
        else:
            raise UUIDVersionError("UUID version %s is not valid." % self.version)
test_timeuuid.py 文件源码 项目:komlogd 作者: komlog-io 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_create_timeuuid_with_uuid4_string_should_fail(self):
        ''' creating a TimeUUID with a hex uuid4 should fail'''
        for i in range(1,100):
            u = uuid.uuid4()
            with self.assertRaises(ValueError) as cm:
                t = timeuuid.TimeUUID(s=u.hex)
            self.assertEqual(str(cm.exception), 'Invalid UUID type')
        for fn in [uuid.uuid3, uuid.uuid5]:
            for i in range(1,100):
                u = fn(uuid.NAMESPACE_DNS,str(os.urandom(10)))
                with self.assertRaises(ValueError) as cm:
                    t = timeuuid.TimeUUID(s=u.hex)
                self.assertEqual(str(cm.exception), 'Invalid UUID type')
accesstoken.py 文件源码 项目:mltshp 作者: MLTSHP 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def generate(authorization_id):
        """
        Generate an access token based on an (unexpired) authorization id.
        """
        auth = authorizationcode.Authorizationcode.get('id=%s', authorization_id)
        consumer_key = uuid.uuid3(uuid.NAMESPACE_DNS, base36encode(auth.id) + '-' + base36encode(auth.app_id))
        consumer_secret = sha224("%s%s" % (str(uuid.uuid1()), time.time())).hexdigest()

        if auth.expires_at > datetime.utcnow():
            access_token = Accesstoken(user_id=auth.user_id, app_id=auth.app_id, consumer_key=str(consumer_key), consumer_secret=str(consumer_secret))
            access_token.save()
            return access_token
        else:
            return None
udocker.py 文件源码 项目:udocker 作者: indigo-dc 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def uuid(self, name):
        """Get an ID"""
        if not name:
            name = self.def_name
        try:
            return str(uuid.uuid3(uuid.uuid4(), str(name) + str(time.time())))
        except (NameError, AttributeError):
            return(("%s-%s-%s-%s-%s") %
                   (self._rnd(8), self._rnd(4), self._rnd(4),
                    self._rnd(4), self._rnd(12)))
udocker.py 文件源码 项目:udocker 作者: indigo-dc 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def filename(self, filename):
        """Get a filename"""
        prefix = self.def_name + "-" + str(os.getpid()) + "-"
        try:
            return(prefix +
                   str(uuid.uuid3(uuid.uuid4(), str(time.time()))) +
                   "-" + str(filename))
        except (NameError, AttributeError):
            return prefix + self.uuid(filename) + "-" + str(filename)
util.py 文件源码 项目:monique 作者: monique-dashboards 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def uuid_for_string(s):
    return uuid.uuid3(uuid.NAMESPACE_OID, s)
external_devices_layer.py 文件源码 项目:jf 作者: rolycg 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_mount_point(block):
    global collection
    global messages
    bus = dbus.SystemBus()
    obj = bus.get_object('org.freedesktop.UDisks2', block)
    iface = dbus.Interface(obj, 'org.freedesktop.DBus.Properties')  # Here we use this 'magic' interface
    dbus_mount_point = iface.Get('org.freedesktop.UDisks2.Filesystem', 'MountPoints')
    mount_point = ''
    while not len(dbus_mount_point):
        time.sleep(0.5)
        dbus_mount_point = iface.Get('org.freedesktop.UDisks2.Filesystem', 'MountPoints')
    dbus_id = iface.Get('org.freedesktop.UDisks2.Block', 'Id')
    dbus_name = iface.Get('org.freedesktop.UDisks2.Block', 'IdLabel')
    dbus_space = iface.Get('org.freedesktop.UDisks2.Block', 'Size')
    for letter in dbus_mount_point[0]:
        mount_point += chr(letter)
    if not dbus_name:
        dbus_name = mount_point[:-1].split(os.sep)
        dbus_name = dbus_name[len(dbus_name) - 1]
    if not dbus_id:
        dbus_id = uuid.uuid3(uuid.uuid4(), dbus_name)
    collection[block] = [str(mount_point[:-1]), str(dbus_id), str(dbus_name), None, None, dbus_space]
    messages.append(
        'You have a new device connected (' + dbus_name + ', ' + extra_functions.convert_to_human_readable(
            dbus_space) + '). To have JF track it, execute:' + '\n' + '      jf ' + '-i ' + str(mount_point[:-1]))
    return dbus_id, block, dbus_name
motionsense.py 文件源码 项目:CerebralCortex-2.0-legacy 作者: MD2Korg 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def attachment_marker(raw_stream_id: uuid, stream_name: str, owner_id: uuid, dd_stream_name, CC: CerebralCortex,
                      config: dict):
    """
    Label sensor data as sensor-on-body, sensor-off-body, or improper-attachment.
    All the labeled data (st, et, label) with its metadata are then stored in a datastore

    """
    # TODO: quality streams could be multiple so find the one computed with CC
    # using stream_id, data-diagnostic-stream-id, and owner id to generate a unique stream ID for battery-marker
    attachment_marker_stream_id = uuid.uuid3(uuid.NAMESPACE_DNS, str(raw_stream_id + dd_stream_name + owner_id))

    stream_days = get_stream_days(raw_stream_id, attachment_marker_stream_id, CC)

    for day in stream_days:
        # load stream data to be diagnosed
        raw_stream = CC.get_datastream(raw_stream_id, day, data_type=DataSet.COMPLETE)

        if len(raw_stream.data) > 0:
            windowed_data = window(raw_stream.data, config['general']['window_size'], True)
            results = process_windows(windowed_data, config)
            merged_windows = merge_consective_windows(results)

            input_streams = [{"owner_id": owner_id, "id": str(raw_stream_id), "name": stream_name}]
            output_stream = {"id": attachment_marker_stream_id, "name": dd_stream_name,
                             "algo_type": config["algo_type"]["attachment_marker"]}
            metadata = get_metadata(dd_stream_name, input_streams, config)
            store(merged_windows, input_streams, output_stream, metadata, CC, config)
phone_screen_touch.py 文件源码 项目:CerebralCortex-2.0-legacy 作者: MD2Korg 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def phone_screen_touch_marker(raw_stream_id: uuid, raw_stream_name: str, owner_id, dd_stream_name, CC: CerebralCortex,
                              config: dict, start_time=None, end_time=None):
    """
    This is not part of core data diagnostic suite.
    It only calculates how many screen touches are there.
    :param raw_stream_id:
    :param CC:
    :param config:
    """

    try:
        # using stream_id, data-diagnostic-stream-id, and owner id to generate a unique stream ID for battery-marker
        screen_touch_stream_id = uuid.uuid3(uuid.NAMESPACE_DNS, str(
            raw_stream_id + dd_stream_name + owner_id + "mobile phone screen touch marker"))

        stream_days = get_stream_days(raw_stream_id, screen_touch_stream_id, CC)

        for day in stream_days:
            stream = CC.get_datastream(raw_stream_id, data_type=DataSet.COMPLETE, day=day, start_time=start_time,
                                       end_time=end_time)
            if len(stream.data) > 0:
                windowed_data = window(stream.data, config['general']['window_size'], True)
                results = process_windows(windowed_data)

                merged_windows = merge_consective_windows(results)
                if len(merged_windows) > 0:
                    input_streams = [{"owner_id": owner_id, "id": str(raw_stream_id), "name": raw_stream_name}]
                    output_stream = {"id": screen_touch_stream_id, "name": dd_stream_name,
                                     "algo_type": config["algo_type"]["app_availability_marker"]}
                    metadata = get_metadata(dd_stream_name, input_streams, config)
                    store(merged_windows, input_streams, output_stream, metadata, CC, config)

    except Exception as e:
        print(e)
battery_data_marker.py 文件源码 项目:CerebralCortex-2.0-legacy 作者: MD2Korg 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def battery_marker(raw_stream_id: uuid, stream_name: str, owner_id, dd_stream_name, CC: CerebralCortex, config: dict,
                   start_time=None, end_time=None):
    """
    This algorithm uses battery percentages to decide whether device was powered-off or battery was low.
    All the labeled data (st, et, label) with its metadata are then stored in a datastore.
    :param raw_stream_id:
    :param CC:
    :param config:
    """

    try:
        # using stream_id, data-diagnostic-stream-id, and owner id to generate a unique stream ID for battery-marker
        battery_marker_stream_id = uuid.uuid3(uuid.NAMESPACE_DNS, str(raw_stream_id + dd_stream_name + owner_id))

        stream_days = get_stream_days(raw_stream_id, battery_marker_stream_id, CC)

        for day in stream_days:
            stream = CC.get_datastream(raw_stream_id, data_type=DataSet.COMPLETE, day=day)

            if len(stream.data) > 0:
                windowed_data = window(stream.data, config['general']['window_size'], True)
                results = process_windows(windowed_data, stream_name, config)

                merged_windows = merge_consective_windows(results)
                if len(merged_windows) > 0:
                    input_streams = [{"owner_id": owner_id, "id": str(raw_stream_id), "name": stream_name}]
                    output_stream = {"id": battery_marker_stream_id, "name": dd_stream_name,
                                     "algo_type": config["algo_type"]["battery_marker"]}
                    labelled_windows = mark_windows(battery_marker_stream_id, merged_windows, CC, config)
                    metadata = get_metadata(dd_stream_name, input_streams, config)
                    store(labelled_windows, input_streams, output_stream, metadata, CC, config)
    except Exception as e:
        print(e)
motionsense.py 文件源码 项目:CerebralCortex-2.0-legacy 作者: MD2Korg 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def sensor_availability(raw_stream_id: uuid, stream_name: str, owner_id: uuid, dd_stream_name,
                        phone_physical_activity, CC: CerebralCortex, config: dict):
    """
    Mark missing data as wireless disconnection if a participate walks away from phone or sensor
    :param raw_stream_id:
    :param stream_name:
    :param owner_id:
    :param dd_stream_name:
    :param phone_physical_activity:
    :param CC:
    :param config:
    """

    # using stream_id, data-diagnostic-stream-id, and owner id to generate a unique stream ID for battery-marker
    wireless_marker_stream_id = uuid.uuid3(uuid.NAMESPACE_DNS, str(raw_stream_id + dd_stream_name + owner_id))

    stream_days = get_stream_days(raw_stream_id, wireless_marker_stream_id, CC)

    for day in stream_days:
        # load stream data to be diagnosed
        raw_stream = CC.get_datastream(raw_stream_id, day, data_type=DataSet.COMPLETE)
        if len(raw_stream.data) > 0:

            windowed_data = window(raw_stream.data, config['general']['window_size'], True)
            results = process_windows(windowed_data, day, CC, phone_physical_activity, config)
            merged_windows = merge_consective_windows(results)

            if len(merged_windows) > 0:
                input_streams = [{"owner_id": owner_id, "id": str(raw_stream_id), "name": stream_name}]
                output_stream = {"id": wireless_marker_stream_id, "name": dd_stream_name,
                                 "algo_type": config["algo_type"]["sensor_unavailable_marker"]}
                metadata = get_metadata(dd_stream_name, input_streams, config)
                store(merged_windows, input_streams, output_stream, metadata, CC, config)
main.py 文件源码 项目:CerebralCortex-2.0-legacy 作者: MD2Korg 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def analyze_quality(streams, owner_id, led_right_wrist_quality_stream_name, wrist, CC):
    led_stream_quality_id = uuid.uuid3(uuid.NAMESPACE_DNS, str(led_right_wrist_quality_stream_name + owner_id+"LED quality computed on CerebralCortex"))
    if wrist=="right":
        if "LED--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST" in streams:
            led_wrist_stream_id = streams["LED--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST"][
                "identifier"]
            led_wrist_stream_name = streams["LED--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST"]["name"]
        else:
            led_wrist_stream_id = None
    else:
        if "LED--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST" in streams:
            led_wrist_stream_id = streams["LED--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST"][
                "identifier"]
            led_wrist_stream_name = streams["LED--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST"]["name"]
        else:
            led_wrist_stream_id = None

    if led_wrist_stream_id:
        stream_end_days = CC.get_stream_start_end_time(led_wrist_stream_id)
        if stream_end_days["start_time"] and stream_end_days["end_time"]:
            days = stream_end_days["end_time"] - stream_end_days["start_time"]
            for day in range(days.days + 1):
                day = (stream_end_days["start_time"]+timedelta(days=day)).strftime('%Y%m%d')
                stream = CC.get_datastream(led_wrist_stream_id, data_type=DataSet.COMPLETE, day=day)
                if len(stream.data) > 0:
                    windowed_data = window(stream.data, 3, False)
                    led_quality_windows = data_quality_led(windowed_data)

                    input_streams = [{"owner_id": str(owner_id), "id": str(led_wrist_stream_id),
                                      "name": led_wrist_stream_name}]
                    output_stream = {"id": str(led_stream_quality_id), "name": led_right_wrist_quality_stream_name, "algo_type": ""}

                    store(led_quality_windows, input_streams, output_stream, CC)
main.py 文件源码 项目:CerebralCortex-2.0-legacy 作者: MD2Korg 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def migrate(folder_path: str, data_block_size):
    """
    Migrate data from old CerebralCortex structure to new CerebralCortex structure
    :param folder_path:
    """

    configuration_file = os.path.join(os.path.dirname(__file__), '../../cerebralcortex.yml')
    CC = CerebralCortex(configuration_file, master="local[*]", name="Data Migrator API", time_zone="US/Central", load_spark=True)

    if not folder_path:
        raise ValueError("Path to the data directory cannot be empty.")

    for filename in glob.iglob(folder_path + '/**/*.json', recursive=True):
        print(str(datetime.datetime.now()) + " -- Started processing file " + filename)

        tmp = filename.split("/")
        tmp = tmp[len(tmp) - 1].split("+")
        owner_id = tmp[0]
        stream_id = str(uuid.uuid3(uuid.NAMESPACE_DNS, str(tmp[0] + " " + tmp[1])))

        name = ''
        for i in tmp[3:]:
            name += i + " "

        name = name.strip().replace(".json", "")
        name = tmp[1] + " " + name

        pm_algo_name = tmp[2]

        data_filename = filename.replace(".json", ".csv.bz2")
        old_schema = read_file(filename)
        execution_context = get_execution_context(pm_algo_name, old_schema)
        data_descriptor = get_data_descriptor(old_schema)
        annotations = get_annotations()
        print(str(datetime.datetime.now()) + " -- Schema building is complete ")
        print(str(datetime.datetime.now()) + " -- Started unzipping file and adding records in Cassandra ")
        for data_block in bz2file_to_datapoints(data_filename, data_block_size):
            persist_data(execution_context, data_descriptor, annotations, stream_id, name, owner_id, data_block, CC)
        print(str(datetime.datetime.now()) + " -- Completed processing file " + filename)
auth.py 文件源码 项目:videoplusplus 作者: ArthurChiao 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def generate_token(username):
    """generate a short token based on given username"""
    username = str(username)
    token = str(uuid.uuid3(uuid.NAMESPACE_URL, username))[:6]
    return token
populate.py 文件源码 项目:pillar-prototype 作者: swiftgist 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def cluster_config(self):
        """
        Provide the default configuration for a cluster
        """
        if self.cluster:
            cluster_dir = "{}/config/stack/default/{}".format(self.root_dir, self.cluster)
            if not os.path.isdir(cluster_dir):
                 create_dirs(cluster_dir, self.root_dir)
            filename = "{}/cluster.yml".format(cluster_dir)
            contents = {}
            contents['fsid'] = str(uuid.uuid3(uuid.NAMESPACE_DNS, self.keyring_roles['admin']))
            contents['admin_method'] = "default"
            contents['configuration_method'] = "default"
            contents['mds_method'] = "default"
            contents['mon_method'] = "default"
            contents['osd_method'] = "default"
            contents['package_method'] = "default"
            contents['pool_method'] = "default"
            contents['repo_method'] = "default"
            contents['rgw_method'] = "default"
            contents['update_method'] = "default"

            contents['public_network'] = self.public_network
            contents['cluster_network'] = self.cluster_network

            self.writer.write(filename, contents)
Worker.py 文件源码 项目:bluesteel 作者: imvu 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_host_info():
    """ Returns an object with unique information about the host """
    obj = {}
    # uuid.getnode() can return a random number, we need to fix it
    obj['uuid'] = str(uuid.uuid3(uuid.NAMESPACE_DNS, str(uuid.getnode())))
    obj['host_name'] = socket.gethostname()
    obj['operative_system'] = '{0}-{1}'.format(
        platform.system(),
        platform.release()
    )
    return obj
connection.py 文件源码 项目:Chorus 作者: DonaldBough 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _fabric_server_uuid(host, port):
    """Create a UUID using host and port"""
    return uuid.uuid3(uuid.NAMESPACE_URL, _fabric_xmlrpc_uri(host, port))
speech.py 文件源码 项目:ehForwarderBot 作者: blueset 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def recognize(self, path, lang="zh-CN"):
        if isinstance(path, str):
            file = open(path, 'rb')
        else:
            return ["ERROR!", "File must by a path string."]
        if lang not in self.lang_list:
            return ["ERROR!", "Invalid language."]

        audio = pydub.AudioSegment.from_file(file)
        audio = audio.set_frame_rate(16000)
        audio.export("%s.wav" % path, format="wav")
        header = {
            "Authorization": "Bearer %s" % self.access_token,
            "Content-Type": "audio/wav; samplerate=16000"
        }
        d = {
            "version": "3.0",
            "requestid": str(uuid.uuid1()),
            "appID": "D4D52672-91D7-4C74-8AD8-42B1D98141A5",
            "format": "json",
            "locale": lang,
            "device.os": "Telegram",
            "scenarios": "ulm",
            "instanceid": uuid.uuid3(uuid.NAMESPACE_DNS, 'com.1a23.eh_telegram_master'),
            "maxnbest": 5
        }
        with open("%s.wav" % path, 'rb') as f:
            r = requests.post("https://speech.platform.bing.com/recognize", params=d, data=f.read(), headers=header)
        os.remove("%s.wav" % path)
        try:
            rjson = r.json()
        except:
            return ["ERROR!", r.text]
        if r.status_code == 200:
            return [i['name'] for i in rjson['results']]
        else:
            return ["ERROR!", r.text]
expression.py 文件源码 项目:pyetje 作者: rorlika 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def type_coerce(expr, type_):
    """Coerce the given expression into the given type,
    on the Python side only.

    :func:`.type_coerce` is roughly similar to :func:`.cast`, except no
    "CAST" expression is rendered - the given type is only applied towards
    expression typing and against received result values.

    e.g.::

        from sqlalchemy.types import TypeDecorator
        import uuid

        class AsGuid(TypeDecorator):
            impl = String

            def process_bind_param(self, value, dialect):
                if value is not None:
                    return str(value)
                else:
                    return None

            def process_result_value(self, value, dialect):
                if value is not None:
                    return uuid.UUID(value)
                else:
                    return None

        conn.execute(
            select([type_coerce(mytable.c.ident, AsGuid)]).\\
                    where(
                        type_coerce(mytable.c.ident, AsGuid) ==
                        uuid.uuid3(uuid.NAMESPACE_URL, 'bar')
                    )
        )

    """
    type_ = sqltypes.to_instance(type_)

    if hasattr(expr, '__clause_element__'):
        return type_coerce(expr.__clause_element__(), type_)
    elif isinstance(expr, BindParameter):
        bp = expr._clone()
        bp.type = type_
        return bp
    elif not isinstance(expr, Visitable):
        if expr is None:
            return null()
        else:
            return literal(expr, type_=type_)
    else:
        return Label(None, expr, type_=type_)


问题


面经


文章

微信
公众号

扫码关注公众号