python类disk_io_counters()的实例源码

test_memory_leaks.py 文件源码 项目:FancyWord 作者: EastonLee 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_disk_io_counters(self):
        self.execute(psutil.disk_io_counters)

    # --- proc
test_linux.py 文件源码 项目:FancyWord 作者: EastonLee 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_disk_io_counters_kernel_2_4_mocked(self):
        # Tests /proc/diskstats parsing format for 2.4 kernels, see:
        # https://github.com/giampaolo/psutil/issues/767
        def open_mock(name, *args, **kwargs):
            if name == '/proc/partitions':
                return io.StringIO(textwrap.dedent(u"""\
                    major minor  #blocks  name

                       8        0  488386584 hda
                    """))
            elif name == '/proc/diskstats':
                return io.StringIO(
                    u("   3     0   1 hda 2 3 4 5 6 7 8 9 10 11 12"))
            else:
                return orig_open(name, *args, **kwargs)

        orig_open = open
        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
        with mock.patch(patch_point, side_effect=open_mock) as m:
            ret = psutil.disk_io_counters()
            assert m.called
            self.assertEqual(ret.read_count, 1)
            self.assertEqual(ret.read_merged_count, 2)
            self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
            self.assertEqual(ret.read_time, 4)
            self.assertEqual(ret.write_count, 5)
            self.assertEqual(ret.write_merged_count, 6)
            self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
            self.assertEqual(ret.write_time, 8)
            self.assertEqual(ret.busy_time, 10)
test_linux.py 文件源码 项目:FancyWord 作者: EastonLee 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_disk_io_counters_kernel_2_6_full_mocked(self):
        # Tests /proc/diskstats parsing format for 2.6 kernels,
        # lines reporting all metrics:
        # https://github.com/giampaolo/psutil/issues/767
        def open_mock(name, *args, **kwargs):
            if name == '/proc/partitions':
                return io.StringIO(textwrap.dedent(u"""\
                    major minor  #blocks  name

                       8        0  488386584 hda
                    """))
            elif name == '/proc/diskstats':
                return io.StringIO(
                    u("   3    0   hda 1 2 3 4 5 6 7 8 9 10 11"))
            else:
                return orig_open(name, *args, **kwargs)

        orig_open = open
        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
        with mock.patch(patch_point, side_effect=open_mock) as m:
            ret = psutil.disk_io_counters()
            assert m.called
            self.assertEqual(ret.read_count, 1)
            self.assertEqual(ret.read_merged_count, 2)
            self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
            self.assertEqual(ret.read_time, 4)
            self.assertEqual(ret.write_count, 5)
            self.assertEqual(ret.write_merged_count, 6)
            self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
            self.assertEqual(ret.write_time, 8)
            self.assertEqual(ret.busy_time, 10)
test_linux.py 文件源码 项目:FancyWord 作者: EastonLee 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_disk_io_counters_kernel_2_6_limited_mocked(self):
        # Tests /proc/diskstats parsing format for 2.6 kernels,
        # where one line of /proc/partitions return a limited
        # amount of metrics when it bumps into a partition
        # (instead of a disk). See:
        # https://github.com/giampaolo/psutil/issues/767
        def open_mock(name, *args, **kwargs):
            if name == '/proc/partitions':
                return io.StringIO(textwrap.dedent(u"""\
                    major minor  #blocks  name

                       8        0  488386584 hda
                    """))
            elif name == '/proc/diskstats':
                return io.StringIO(
                    u("   3    1   hda 1 2 3 4"))
            else:
                return orig_open(name, *args, **kwargs)

        orig_open = open
        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
        with mock.patch(patch_point, side_effect=open_mock) as m:
            ret = psutil.disk_io_counters()
            assert m.called
            self.assertEqual(ret.read_count, 1)
            self.assertEqual(ret.read_bytes, 2 * SECTOR_SIZE)
            self.assertEqual(ret.write_count, 3)
            self.assertEqual(ret.write_bytes, 4 * SECTOR_SIZE)

            self.assertEqual(ret.read_merged_count, 0)
            self.assertEqual(ret.read_time, 0)
            self.assertEqual(ret.write_merged_count, 0)
            self.assertEqual(ret.write_time, 0)
            self.assertEqual(ret.busy_time, 0)


# =====================================================================
# --- misc
# =====================================================================
test_system.py 文件源码 项目:FancyWord 作者: EastonLee 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_disk_io_counters(self):
        def check_ntuple(nt):
            self.assertEqual(nt[0], nt.read_count)
            self.assertEqual(nt[1], nt.write_count)
            self.assertEqual(nt[2], nt.read_bytes)
            self.assertEqual(nt[3], nt.write_bytes)
            if not (OPENBSD or NETBSD):
                self.assertEqual(nt[4], nt.read_time)
                self.assertEqual(nt[5], nt.write_time)
                if LINUX:
                    self.assertEqual(nt[6], nt.read_merged_count)
                    self.assertEqual(nt[7], nt.write_merged_count)
                    self.assertEqual(nt[8], nt.busy_time)
                elif FREEBSD:
                    self.assertEqual(nt[6], nt.busy_time)
            for name in nt._fields:
                assert getattr(nt, name) >= 0, nt

        ret = psutil.disk_io_counters(perdisk=False)
        check_ntuple(ret)
        ret = psutil.disk_io_counters(perdisk=True)
        # make sure there are no duplicates
        self.assertEqual(len(ret), len(set(ret)))
        for key in ret:
            assert key, key
            check_ntuple(ret[key])
            if LINUX and key[-1].isdigit():
                # if 'sda1' is listed 'sda' shouldn't, see:
                # https://github.com/giampaolo/psutil/issues/338
                while key[-1].isdigit():
                    key = key[:-1]
                self.assertNotIn(key, ret.keys())

    # can't find users on APPVEYOR or TRAVIS
instanceMonitor.py 文件源码 项目:rune 作者: hoonkim 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def GetSystemState(self):
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(('8.8.8.8',0))
        self.address = s.getsockname()[0]
        self.cpu_count = psutil.cpu_count()
        self.cpu_percent = psutil.cpu_percent(interval=1,percpu=True)
        self.mem = psutil.virtual_memory()
        self.disk_usage = psutil.disk_usage('/')
        self.disk_io = psutil.disk_io_counters()
        self.net_io = psutil.net_io_counters()
        self.hostname = socket.gethostname()
instanceMonitor.py 文件源码 项目:rune 作者: hoonkim 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def GetSystemState(self):
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(('8.8.8.8',0))
        self.address = s.getsockname()[0]
        self.cpu_count = psutil.cpu_count()
        self.cpu_percent = psutil.cpu_percent(interval=1,percpu=True)
        self.mem = psutil.virtual_memory()
        self.disk_usage = psutil.disk_usage('/')
        self.disk_io = psutil.disk_io_counters()
        self.net_io = psutil.net_io_counters()
        self.hostname = socket.gethostname()
mysql_watcher.py 文件源码 项目:MySQL_Watcher 作者: kinghows 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def f_print_linux_status(save_as):
    ###????###################################################################
    #scputimes(user=, nice, system, idle, iowait, irq, softirq,steal, guest, guest_nice)
    cpu_times = psutil.cpu_times()
    #scpustats(ctx_switches, interrupts, soft_interrupts, syscalls)
    #cpu_stats = psutil.cpu_stats()
    # svmem(total , available, percent, used , free, active, inactive, buffers, cached, shared)
    mem = psutil.virtual_memory()
    # sswap(total, used, free, percent, sin, sout)
    swap = psutil.swap_memory()
    #sdiskusage(total, used, free, percent)
    #disk_usage = psutil.disk_usage('/')
    #sdiskio(read_count, write_count, read_bytes, write_bytes, read_time, write_time)
    #disk_io_counters = psutil.disk_io_counters()
    #snetio(bytes_sent, bytes_recv, packets_sent, packets_recv, errin, errout, dropin, dropout)
    #net = psutil.net_io_counters()
    #load
    try:
        load = os.getloadavg()
    except (OSError, AttributeError):
        stats = {}
    else:
        stats = {'min1': load[0], 'min5': load[1], 'min15': load[2]}

    #Uptime = datetime.datetime.fromtimestamp(psutil.boot_time()).strftime("%Y-%m-%d %H:%M:%S")
    ###????###################################################################
    style1 = {1: ' ,6,l', 2: ' ,10,r',3: ' ,6,l', 4: ' ,10,r',5: ' ,6,l', 6: ' ,6,r',7: ' ,8,l',8: ' ,6,r',9: ' ,6,l', 10: ' ,6,r',11: ' ,6,l', 12: ' ,5,r',}
    style = {1: ' ,l', 2: ' ,r',3: ' ,l', 4: ' ,r',5: ' ,l', 6: ' ,r',7: ' ,l',8: ' ,r',9: ' ,l', 10: ' ,r',11: ' ,l', 12: ' ,r',}
    rows=[
          ["CPU", str(psutil.cpu_percent(interval=1))+'%',"nice", cpu_times.nice,"MEM", str(mem.percent) + '%',"active", str(mem.active/1024/1024) + 'M',"SWAP", str(swap.percent)+'%',"LOAD", str(psutil.cpu_count())+'core'],
          ["user", cpu_times.user,"irq", cpu_times.irq,"total", str(mem.total/1024/1024)+'M',"inactive", str(mem.inactive/1024/1024) + 'M',"total", str(swap.total/1024/1024) + 'M',"1 min", stats["min1"]],
          ["system", cpu_times.system,"iowait", cpu_times.iowait,"used", str(mem.used/1024/1024)+'M',"buffers", str(mem.buffers/1024/1024) + 'M',"used", str(swap.used/1024/1024) + 'M',"5 min", stats["min5"]],
          ["idle", cpu_times.idle,"steal", cpu_times.steal,"free", str(mem.free/1024/1024) + 'M',"cached", str(mem.cached/1024/1024) + 'M',"free", str(swap.free/1024/1024) + 'M',"15 min", stats["min15"]]
         ]

    title = "Linux Overview"
    if save_as == "txt":
        f_print_title(title)
        f_print_table_body(rows, style1,' ')
    elif save_as == "html":
        f_print_table_html(rows, title, style)
check.py 文件源码 项目:icinga2checks 作者: c-store 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def check_disk(i_warning, i_critical, s_partition):
    """
    Checks for disk stats

    Gets:
        i_warning: Warning Threshold
        i_critical: Critical Threshold
        s_partition: partition that should be used to trigger WARNING/CRITICAL

    Returns:
        check output including perfdata
    """
    test_int(i_warning, i_critical)
    test_string(s_partition)

    s_perfdata                  = ''
    s_output                    = ''
    l_partitions                = psutil.disk_partitions()
    d_io_counters               = psutil.disk_io_counters(perdisk=True)
    f_monitored_partition_usage = 0.0

    for nt_partition in l_partitions:
        # get usage for every partition
        d_disk_usage = psutil.disk_usage(nt_partition.mountpoint)._asdict()
        # add all usage data to perfdata
        for key, value in d_disk_usage.items():
            s_perfdata = add_perfdata(s_perfdata, nt_partition.mountpoint, key, value)

        # check monitored partition and add status to output
        if nt_partition.mountpoint == s_partition:
            s_output = check_status(i_warning, i_critical, d_disk_usage['percent'])
            f_monitored_partition_usage = d_disk_usage['percent']

    # add message if status is not OK
    if not 'OK' in s_output:
        s_output += ' {} has a usage of {} percent.'.format(s_partition, f_monitored_partition_usage)

    # add all the mountpoints and other info to output
    for nt_partition in l_partitions:
        d_partition = nt_partition._asdict()
        for key, value in d_partition.items():
            if not key == 'device':
                s_output += '\n{}.{}={}'.format(
                    d_partition['device'],
                    key,
                    value
                )

    for s_device, nt_partition in d_io_counters.items():
        d_partition = nt_partition._asdict()
        # add all io_counters to perfdata
        for key, value in d_partition.items():
            s_perfdata = add_perfdata(s_perfdata, s_device, key, value)

    # put it all together
    s_output += ' | {}'.format(s_perfdata)

    return s_output
iotop.py 文件源码 项目:LinuxBashShellScriptForOps 作者: DingGuodong 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def poll(interval):
    """Calculate IO usage by comparing IO statics before and
    after the interval.
    Return a tuple including all currently running processes
    sorted by IO activity and total disks I/O activity.
    """
    # first get a list of all processes and disk io counters
    procs = [p for p in psutil.process_iter()]
    for p in procs[:]:
        try:
            p._before = p.io_counters()
        except psutil.Error:
            procs.remove(p)
            continue
    disks_before = psutil.disk_io_counters()

    # sleep some time
    time.sleep(interval)

    # then retrieve the same info again
    for p in procs[:]:
        try:
            p._after = p.io_counters()
            p._cmdline = ' '.join(p.cmdline())
            if not p._cmdline:
                p._cmdline = p.name()
            p._username = p.username()
        except (psutil.NoSuchProcess, psutil.ZombieProcess):
            procs.remove(p)
    disks_after = psutil.disk_io_counters()

    # finally calculate results by comparing data before and
    # after the interval
    for p in procs:
        p._read_per_sec = p._after.read_bytes - p._before.read_bytes
        p._write_per_sec = p._after.write_bytes - p._before.write_bytes
        p._total = p._read_per_sec + p._write_per_sec

    disks_read_per_sec = disks_after.read_bytes - disks_before.read_bytes
    disks_write_per_sec = disks_after.write_bytes - disks_before.write_bytes

    # sort processes by total disk IO so that the more intensive
    # ones get listed first
    processes = sorted(procs, key=lambda p: p._total, reverse=True)

    return (processes, disks_read_per_sec, disks_write_per_sec)


问题


面经


文章

微信
公众号

扫码关注公众号