python类disk_usage()的实例源码

hostinfo.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def statvfs():
    docker_path = CONF.docker.root_directory
    if not os.path.exists(docker_path):
        docker_path = '/'
    return psutil.disk_usage(docker_path)
tendrl_gluster_brick_disk_stats.py 文件源码 项目:node-agent 作者: Tendrl 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_disk_usage(self, device_name):
        return psutil.disk_usage(device_name)
__init__.py 文件源码 项目:Chasar 作者: camilochs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def process_send_data(socket, context):
    """
    Send all memory, cpu, disk, network data of computer to server(master node)
    """
    while True:
        try:
            cpu_percent = psutil.cpu_percent(interval=1, percpu=True)
            memory_info = psutil.virtual_memory()
            disk_info = psutil.disk_usage('/')
            pids_computer = psutil.pids()

            info_to_send = json.dumps({
                "computer_utc_clock": str(datetime.datetime.utcnow()),
                "computer_clock": str(datetime.datetime.now()),
                "hostname": platform.node(),
                "mac_address": mac_address(),
                "ipv4_interfaces": internet_addresses(),
                "cpu": {
                    "percent_used": cpu_percent
                },
                "memory": {
                    "total_bytes": memory_info.total,
                    "total_bytes_used": memory_info.used,
                    "percent_used": memory_info.percent
                },
                "disk": {
                    "total_bytes": disk_info.total,
                    "total_bytes_used": disk_info.used,
                    "total_bytes_free": disk_info.free,
                    "percent_used": disk_info.percent
                },
                "process": pids_active(pids_computer)

            }).encode()
            #send json data in the channel 'status', although is not necessary to send.
            socket.send_multipart([b"status", info_to_send])
            #time.sleep(0.500)

        except (KeyboardInterrupt, SystemExit):
            socket.close()
            context.term()
__init__.py 文件源码 项目:Chasar 作者: camilochs 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def process_send_data(socket, context):
    """
    Send all memory, cpu, disk, network data of computer to server(master node)
    """
    while True:
        try:
            cpu_percent = psutil.cpu_percent(interval=1, percpu=True)
            memory_info = psutil.virtual_memory()
            disk_info = psutil.disk_usage('/')
            pids_computer = psutil.pids()

            info_to_send = json.dumps({
                "computer_utc_clock": str(datetime.datetime.utcnow()),
                "computer_clock": str(datetime.datetime.now()),
                "hostname": platform.node(),
                "mac_address": mac_address(),
                "ipv4_interfaces": internet_addresses(),
                "cpu": {
                    "percent_used": cpu_percent
                },
                "memory": {
                    "total_bytes": memory_info.total,
                    "total_bytes_used": memory_info.used,
                    "percent_used": memory_info.percent
                },
                "disk": {
                    "total_bytes": disk_info.total,
                    "total_bytes_used": disk_info.used,
                    "total_bytes_free": disk_info.free,
                    "percent_used": disk_info.percent
                },
                "process": pids_active(pids_computer)

            }).encode()
            #send json data in the channel 'status', although is not necessary to send.
            socket.send_multipart([b"status", info_to_send])
            #time.sleep(0.500)

        except (KeyboardInterrupt, SystemExit):
            socket.close()
            context.term()
hdd.py 文件源码 项目:telebot 作者: DronMDF 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def actions(self):
        if psutil.disk_usage('/').percent > self.level:
            return [AcLowHDD(self.chat_id)]
        return []
hdd.py 文件源码 项目:telebot 作者: DronMDF 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def asString(self):
        return '???? ????? ?? %u%%' % psutil.disk_usage('/').percent
api.py 文件源码 项目:django-sysinfo 作者: saxix 项目源码 文件源码 阅读 46 收藏 0 点赞 0 评论 0
def get_device_info(path):
    try:
        info = psutil.disk_usage(os.path.realpath(path))
        return {"total": humanize_bytes(info.total),
                "used": humanize_bytes(info.used),
                "free": humanize_bytes(info.free)}
    except OSError as e:
        return {"ERROR": str(e)}
system_utilities.py 文件源码 项目:CommunityCellularManager 作者: facebookincubator 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_data(self):
        """Gets system utilization stats."""
        # Get system utilization stats.
        cpu_percent = psutil.cpu_percent(interval=1)
        memory_percent = psutil.virtual_memory().percent
        disk_percent = psutil.disk_usage('/').percent
        network_io = psutil.net_io_counters()
        # Compute deltas for sent and received bytes.  Note this is system-wide
        # network usage and not necessarily GPRS-related.
        # TODO(matt): query on a specific interface..which one, I'm not sure.
        if self.last_bytes_sent == 0:
            bytes_sent_delta = 0
        else:
            bytes_sent_delta = network_io.bytes_sent - self.last_bytes_sent
        self.last_bytes_sent = network_io.bytes_sent
        if self.last_bytes_received == 0:
            bytes_received_delta = 0
        else:
            bytes_received_delta = (
                network_io.bytes_recv - self.last_bytes_received)
        self.last_bytes_received = network_io.bytes_recv
        return {
            'cpu_percent': cpu_percent,
            'memory_percent': memory_percent,
            'disk_percent': disk_percent,
            'bytes_sent_delta': bytes_sent_delta,
            'bytes_received_delta': bytes_received_delta,
        }
download.py 文件源码 项目:Planet-GEE-Pipeline-CLI 作者: samapriya 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def process_size(path, id_list, item_type, asset_type, overwrite):
    results = []
    summation=0
    path= args.size
    spc=psutil.disk_usage(path).free
    remain=float(spc)/1073741824
    # now start downloading each file
    for item_id in id_list:
        url = ASSET_URL.format(item_type, item_id)
        logging.info('Request: {}'.format(url))
        result = SESSION.get(url)
        check_status(result)
        try:
            if result.json()[asset_type]['status'] == 'active':
                download_url = result.json()[asset_type]['location']
                #print(download_url)
                pool = PoolManager()
                response = pool.request("GET", download_url, preload_content=False)
                max_bytes = 100000000000
                content_bytes = response.headers.get("Content-Length")
                print("Item-ID: "+str(item_id))
                #print(int(content_bytes)/1048576,"MB")
                summary=float(content_bytes)/1073741824
                summation=summation+summary
                #print ("Total Size in MB",summation)
            else:
                result = False
        except KeyError:
            print('Could not check activation status - asset type \'{}\' not found for {}'.format(asset_type, item_id))
            result = False


        results.append(result)
    #print(remain,"MB")
    print("Remaining Space in MB",format(float(remain*1024),'.2f'))
    print("Remaining Space in GB",format(float(remain),'.2f'))
    print ("Total Size in MB",format(float(summation*1024),'.2f'))
    print ("Total Size in GB",format(float(summation),'.2f'))
    return results
ServerCommand.py 文件源码 项目:aetros-cli 作者: aetros 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def collect_system_information(self):
        values = {}
        mem = psutil.virtual_memory()
        values['memory_total'] = mem.total

        import cpuinfo
        cpu = cpuinfo.get_cpu_info()
        values['resources_limit'] = self.resources_limit
        values['cpu_name'] = cpu['brand']
        values['cpu'] = [cpu['hz_advertised_raw'][0], cpu['count']]
        values['nets'] = {}
        values['disks'] = {}
        values['gpus'] = {}
        values['boot_time'] = psutil.boot_time()

        try:
            for gpu_id, gpu in enumerate(aetros.cuda_gpu.get_ordered_devices()):
                gpu['available'] = gpu_id in self.enabled_gpus

                values['gpus'][gpu_id] = gpu
        except Exception: pass

        for disk in psutil.disk_partitions():
            try:
                name = self.get_disk_name(disk[1])
                values['disks'][name] = psutil.disk_usage(disk[1]).total
            except Exception:
                # suppress Operation not permitted
                pass

        try:
            for id, net in psutil.net_if_stats().items():
                if 0 != id.find('lo') and net.isup:
                    self.nets.append(id)
                    values['nets'][id] = net.speed or 1000
        except Exception:
            # suppress Operation not permitted
            pass

        return values
stats.py 文件源码 项目:psystem 作者: gokhanm 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def partition(self, partition):
        """
            System disk partition usage

            Return psutil class
        """
        usage = psutil.disk_usage(str(partition))

        return usage
test_memory_leaks.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_disk_usage(self):
        self.execute(psutil.disk_usage, '.')
test_osx.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_disks(self):
        # test psutil.disk_usage() and psutil.disk_partitions()
        # against "df -a"
        def df(path):
            out = sh('df -k "%s"' % path).strip()
            lines = out.split('\n')
            lines.pop(0)
            line = lines.pop(0)
            dev, total, used, free = line.split()[:4]
            if dev == 'none':
                dev = ''
            total = int(total) * 1024
            used = int(used) * 1024
            free = int(free) * 1024
            return dev, total, used, free

        for part in psutil.disk_partitions(all=False):
            usage = psutil.disk_usage(part.mountpoint)
            dev, total, used, free = df(part.mountpoint)
            self.assertEqual(part.device, dev)
            self.assertEqual(usage.total, total)
            # 10 MB tollerance
            if abs(usage.free - free) > 10 * 1024 * 1024:
                self.fail("psutil=%s, df=%s" % usage.free, free)
            if abs(usage.used - used) > 10 * 1024 * 1024:
                self.fail("psutil=%s, df=%s" % usage.used, used)
test_misc.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_disk_usage(self):
        self.assert_stdout('disk_usage.py')
test_process.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def test_disk_usage(self):
        psutil.disk_usage(self.udir)
test_system.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_disk_usage(self):
        usage = psutil.disk_usage(os.getcwd())
        assert usage.total > 0, usage
        assert usage.used > 0, usage
        assert usage.free > 0, usage
        assert usage.total > usage.used, usage
        assert usage.total > usage.free, usage
        assert 0 <= usage.percent <= 100, usage.percent
        if hasattr(shutil, 'disk_usage'):
            # py >= 3.3, see: http://bugs.python.org/issue12442
            shutil_usage = shutil.disk_usage(os.getcwd())
            tolerance = 5 * 1024 * 1024  # 5MB
            self.assertEqual(usage.total, shutil_usage.total)
            self.assertAlmostEqual(usage.free, shutil_usage.free,
                                   delta=tolerance)
            self.assertAlmostEqual(usage.used, shutil_usage.used,
                                   delta=tolerance)

        # if path does not exist OSError ENOENT is expected across
        # all platforms
        fname = tempfile.mktemp()
        try:
            psutil.disk_usage(fname)
        except OSError as err:
            if err.args[0] != errno.ENOENT:
                raise
        else:
            self.fail("OSError not raised")
test_system.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_disk_usage_unicode(self):
        # see: https://github.com/giampaolo/psutil/issues/416
        safe_rmpath(TESTFN_UNICODE)
        self.addCleanup(safe_rmpath, TESTFN_UNICODE)
        os.mkdir(TESTFN_UNICODE)
        psutil.disk_usage(TESTFN_UNICODE)
test_posix.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_disk_usage(self):
        def df(device):
            out = sh("df -k %s" % device).strip()
            line = out.split('\n')[1]
            fields = line.split()
            total = int(fields[1]) * 1024
            used = int(fields[2]) * 1024
            free = int(fields[3]) * 1024
            percent = float(fields[4].replace('%', ''))
            return (total, used, free, percent)

        tolerance = 4 * 1024 * 1024  # 4MB
        for part in psutil.disk_partitions(all=False):
            usage = psutil.disk_usage(part.mountpoint)
            try:
                total, used, free, percent = df(part.device)
            except RuntimeError as err:
                # see:
                # https://travis-ci.org/giampaolo/psutil/jobs/138338464
                # https://travis-ci.org/giampaolo/psutil/jobs/138343361
                if "no such file or directory" in str(err).lower() or \
                        "raw devices not supported" in str(err).lower():
                    continue
                else:
                    raise
            else:
                self.assertAlmostEqual(usage.total, total, delta=tolerance)
                self.assertAlmostEqual(usage.used, used, delta=tolerance)
                self.assertAlmostEqual(usage.free, free, delta=tolerance)
                self.assertAlmostEqual(usage.percent, percent, delta=1)
aor.py 文件源码 项目:Platypus 作者: gmemstr 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def GetStats():
    s = {}
    s["cpu"] = round(psutil.cpu_percent())  # Used CPU
    s["memory"] = round(psutil.virtual_memory().percent)  # Used memory
    s["disk"] = round(psutil.disk_usage('C:\\').percent)  # Used disk
    return s
host.py 文件源码 项目:JimV-N 作者: jamesiter 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def update_disks(self):
        self.disks.clear()
        for disk in psutil.disk_partitions(all=False):
            disk_usage = psutil.disk_usage(disk.mountpoint)
            self.disks[disk.mountpoint] = {'device': disk.device, 'real_device': disk.device, 'fstype': disk.fstype,
                                           'opts': disk.opts, 'total': disk_usage.total, 'used': disk_usage.used,
                                           'free': disk_usage.free, 'percent': disk_usage.percent}

            if os.path.islink(disk.device):
                self.disks[disk.mountpoint]['real_device'] = os.path.realpath(disk.device)

    # ?????????????? ??? ???


问题


面经


文章

微信
公众号

扫码关注公众号