def tell_system_status(self):
import psutil
import platform
import datetime
os, name, version, _, _, _ = platform.uname()
version = version.split('-')[0]
cores = psutil.cpu_count()
cpu_percent = psutil.cpu_percent()
memory_percent = psutil.virtual_memory()[2]
disk_percent = psutil.disk_usage('/')[3]
boot_time = datetime.datetime.fromtimestamp(psutil.boot_time())
running_since = boot_time.strftime("%A %d. %B %Y")
response = "I am currently running on %s version %s. " % (os, version)
response += "This system is named %s and has %s CPU cores. " % (name, cores)
response += "Current disk_percent is %s percent. " % disk_percent
response += "Current CPU utilization is %s percent. " % cpu_percent
response += "Current memory utilization is %s percent. " % memory_percent
response += "it's running since %s." % running_since
return response
python类cpu_count()的实例源码
def check(request):
return {
'hostname': socket.gethostname(),
'ips': ips,
'cpus': psutil.cpu_count(),
'uptime': timesince(datetime.fromtimestamp(psutil.boot_time())),
'memory': {
'total': filesizeformat(psutil.virtual_memory().total),
'available': filesizeformat(psutil.virtual_memory().available),
'used': filesizeformat(psutil.virtual_memory().used),
'free': filesizeformat(psutil.virtual_memory().free),
'percent': psutil.virtual_memory().percent
},
'swap': {
'total': filesizeformat(psutil.swap_memory().total),
'used': filesizeformat(psutil.swap_memory().used),
'free': filesizeformat(psutil.swap_memory().free),
'percent': psutil.swap_memory().percent
}
}
def __init__(self, cpu_histogram):
super().__init__()
# set up the graphical elements
layout = QGridLayout(self)
self.setLayout(layout)
fig = Figure()
layout.addWidget(FigureCanvas(fig))
# do the plotting
ax = fig.add_subplot(1, 1, 1) # 1x1 grid, first subplot
ax.set_title('CPU Usage Histogram (%s Cores/%s Threads)' % (psutil.cpu_count(False), psutil.cpu_count(True)))
ax.set_ylabel('Count')
ax.set_xlabel('CPU %')
ax.grid(True)
xs = range(0, 101)
ax.plot(xs, [cpu_histogram[x] for x in xs])
ax.xaxis.set_major_locator(MultipleLocator(10.))
self.show()
def cpu_load():
count = psutil.cpu_count()
condition_cpu_loop = True
mem = Memory.Free_Space()
if mem < 200:
print "Achtung, sehr wenig Speicher frei!"
while (condition_cpu_loop == True):
cpu_load = psutil.cpu_percent(interval=cpu_interval)
print(cpu_load)
cpu_load_finish = cpu_load
if(cpu_load > cpu_load_warning):
condition_cpu_loop = False
print("Warning Warning")
print Pids.Pi(count, cpu_load_finish)
return(cpu_load)
def __init__(self):
self.conn = None
self.dirty_scene = False
self.guest = None
self.guest_mapping_by_uuid = dict()
self.hostname = ji.Common.get_hostname()
self.node_id = uuid.getnode()
self.cpu = psutil.cpu_count()
self.memory = psutil.virtual_memory().total
self.interfaces = dict()
self.disks = dict()
self.guest_callbacks = list()
self.interval = 60
# self.last_host_cpu_time = dict()
self.last_host_traffic = dict()
self.last_host_disk_io = dict()
self.last_guest_cpu_time = dict()
self.last_guest_traffic = dict()
self.last_guest_disk_io = dict()
self.ts = ji.Common.ts()
self.ssh_client = None
def wait_for_idle(self, timeout=30):
"""Wait for the system to go idle for at least 2 seconds"""
import monotonic
import psutil
logging.debug("Waiting for Idle...")
cpu_count = psutil.cpu_count()
if cpu_count > 0:
target_pct = 50. / float(cpu_count)
idle_start = None
end_time = monotonic.monotonic() + timeout
idle = False
while not idle and monotonic.monotonic() < end_time:
self.alive()
check_start = monotonic.monotonic()
pct = psutil.cpu_percent(interval=0.5)
if pct <= target_pct:
if idle_start is None:
idle_start = check_start
if monotonic.monotonic() - idle_start > 2:
idle = True
else:
idle_start = None
def wait_for_idle(self):
"""Wait for no more than 50% of a single core used for 500ms"""
import psutil
logging.debug("Waiting for Idle...")
cpu_count = psutil.cpu_count()
if cpu_count > 0:
target_pct = 50. / float(cpu_count)
idle_start = None
end_time = monotonic.monotonic() + self.START_BROWSER_TIME_LIMIT
idle = False
while not idle and monotonic.monotonic() < end_time:
check_start = monotonic.monotonic()
pct = psutil.cpu_percent(interval=0.1)
if pct <= target_pct:
if idle_start is None:
idle_start = check_start
if monotonic.monotonic() - idle_start >= 0.4:
idle = True
else:
idle_start = None
def get_logical_cpu_count():
if psutil is not None:
# Number of logical CPUs
cpu_count = psutil.cpu_count()
elif hasattr(os, 'cpu_count'):
# Python 3.4
cpu_count = os.cpu_count()
else:
cpu_count = None
try:
import multiprocessing
except ImportError:
pass
else:
try:
cpu_count = multiprocessing.cpu_count()
except NotImplementedError:
pass
if cpu_count is not None and cpu_count < 1:
return None
return cpu_count
def __init__(self, run_config, exe_path, max_cpu_time, max_memory, test_case_id,
submission_dir, spj_version, spj_config, output=False):
self._run_config = run_config
self._exe_path = exe_path
self._max_cpu_time = max_cpu_time
self._max_memory = max_memory
self._max_real_time = self._max_cpu_time * 3
self._test_case_id = test_case_id
self._test_case_dir = os.path.join(TEST_CASE_DIR, test_case_id)
self._submission_dir = submission_dir
self._pool = Pool(processes=psutil.cpu_count())
self._test_case_info = self._load_test_case_info()
self._spj_version = spj_version
self._spj_config = spj_config
self._output = output
if self._spj_version and self._spj_config:
self._spj_exe = os.path.join(SPJ_EXE_DIR, self._spj_config["exe_name"].format(spj_version=self._spj_version))
if not os.path.exists(self._spj_exe):
raise JudgeClientError("spj exe not found")
def status(cmd, message, args):
os_icon, os_color = get_os_icon()
general_text = f'Latency: **{int(cmd.bot.latency * 1000)}ms**'
general_text += f'\nPlatform: **{sys.platform.upper()}**'
general_text += f'\nStarted: **{arrow.get(psutil.boot_time()).humanize()}**'
cpu_clock = psutil.cpu_freq()
if cpu_clock:
cpu_clock = cpu_clock.current
else:
cpu_clock = 'Unknown'
cpu_text = f'Count: **{psutil.cpu_count()} ({psutil.cpu_count(logical=False)})**'
cpu_text += f'\nUsage: **{psutil.cpu_percent()}%**'
cpu_text += f'\nClock: **{cpu_clock} MHz**'
used_mem = humanfriendly.format_size(psutil.virtual_memory().available, binary=True)
total_mem = humanfriendly.format_size(psutil.virtual_memory().total, binary=True)
mem_text = f'Used: **{used_mem}**'
mem_text += f'\nTotal: **{total_mem}**'
mem_text += f'\nPercent: **{int(100 - psutil.virtual_memory().percent)}%**'
response = discord.Embed(color=os_color)
response.set_author(name=socket.gethostname(), icon_url=os_icon)
response.add_field(name='General', value=general_text)
response.add_field(name='CPU', value=cpu_text)
response.add_field(name='Memory', value=mem_text)
await message.channel.send(embed=response)
def get_cpu_info(self):
res = {}
try:
# windows 2008 multiprocessing.cpu_count()?????
from cpuinfo import cpuinfo
info = cpuinfo.get_cpu_info()
res['name'] = info['brand']
res['hz'] = info['hz_advertised']
res['brand'] = info['brand']
res['architecture'] = info['arch']
res['physical_cores'] = psutil.cpu_count(logical=False)
res['logical_cores'] = psutil.cpu_count()
except:
res = {}
self.logger.error(traceback.format_exc())
return res
def test_cpu_freq(self):
def check_ls(ls):
for nt in ls:
self.assertLessEqual(nt.current, nt.max)
for name in nt._fields:
value = getattr(nt, name)
self.assertIsInstance(value, (int, long, float))
self.assertGreaterEqual(value, 0)
ls = psutil.cpu_freq(percpu=True)
if TRAVIS and not ls:
return
assert ls, ls
check_ls([psutil.cpu_freq(percpu=False)])
if LINUX:
self.assertEqual(len(ls), psutil.cpu_count())
def test_cpu_freq(self):
def check_ls(ls):
for nt in ls:
self.assertLessEqual(nt.current, nt.max)
for name in nt._fields:
value = getattr(nt, name)
self.assertIsInstance(value, (int, long, float))
self.assertGreaterEqual(value, 0)
ls = psutil.cpu_freq(percpu=True)
if TRAVIS and not ls:
return
assert ls, ls
check_ls([psutil.cpu_freq(percpu=False)])
if LINUX:
self.assertEqual(len(ls), psutil.cpu_count())
getSystemStatus.py 文件源码
项目:LinuxBashShellScriptForOps
作者: DingGuodong
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def getLoadAverage():
if linux:
import multiprocessing
k = 1.0
k /= multiprocessing.cpu_count()
if os.path.exists('/proc/loadavg'):
return [float(open('/proc/loadavg').read().split()[x]) * k for x in range(3)]
else:
tokens = subprocess.check_output(['uptime']).split()
return [float(x.strip(',')) * k for x in tokens[-3:]]
if mswindows:
# TODO(Guodong Ding) get this field data like on Linux for Windows
# print psutil.cpu_percent()
# print psutil.cpu_times_percent()
# print psutil.cpu_times()
# print psutil.cpu_stats()
return "%.2f%%" % psutil.cpu_percent()
def num_cpus(self):
# NOTE: use cpu_count if present (16.04 support)
if hasattr(psutil, 'cpu_count'):
return psutil.cpu_count()
else:
return psutil.NUM_CPUS
def show_platform():
''' Show information on platform'''
swrite('\n=== SYSTEM ===\n\n')
try:
linux_distribution = platform.linux_distribution()
except:
linux_distribution = "N/A"
swrite("""
dist: %s
linux_distribution: %s
system: %s
machine: %s
platform: %s
uname: %s
version: %s
mac_ver: %s
memory: %s
number of CPU: %s
""" % (
str(platform.dist()),
linux_distribution,
platform.system(),
platform.machine(),
platform.platform(),
platform.uname(),
platform.version(),
platform.mac_ver(),
psutil.virtual_memory(),
str(psutil.cpu_count())
))
def help_cpus(self):
msg = QMessageBox()
msg.setIcon(QMessageBox.Question)
msg.setText("Setting the number of CPUs")
msg.setWindowTitle("Number of CPUs")
msg.setInformativeText("SpyKING CIRCUS can use several CPUs "
"either locally or on multiple machines "
"using MPI (see documentation) "
"\n"
"\n"
"You have %d local CPUs available" %psutil.cpu_count()
)
msg.setStandardButtons(QMessageBox.Close)
msg.setDefaultButton(QMessageBox.Close)
answer = msg.exec_()
def _num_cpus():
'''
Compatibility wrapper for calculating the number of CPU's
a unit has.
@returns: int: number of CPU cores detected
'''
try:
return psutil.cpu_count()
except AttributeError:
return psutil.NUM_CPUS
def benchmark(self):
'''Benchmark'''
process = psutil.Process()
memory = process.memory_info().rss / 2 ** 20
process.cpu_percent()
embed = discord.Embed(color = clients.bot_color)
embed.add_field(name = "RAM", value = "{:.2f} MiB".format(memory))
embed.add_field(name = "CPU", value = "Calculating CPU usage..")
message, embed = await self.bot.say(embed = embed)
await asyncio.sleep(1)
cpu = process.cpu_percent() / psutil.cpu_count()
embed.set_field_at(1, name = "CPU", value = "{}%".format(cpu))
await self.bot.edit_message(message, embed = embed)
def _num_cpus():
'''
Compatibility wrapper for calculating the number of CPU's
a unit has.
@returns: int: number of CPU cores detected
'''
try:
return psutil.cpu_count()
except AttributeError:
return psutil.NUM_CPUS
def _num_cpus():
'''
Compatibility wrapper for calculating the number of CPU's
a unit has.
@returns: int: number of CPU cores detected
'''
try:
return psutil.cpu_count()
except AttributeError:
return psutil.NUM_CPUS
def _num_cpus():
'''
Compatibility wrapper for calculating the number of CPU's
a unit has.
@returns: int: number of CPU cores detected
'''
try:
return psutil.cpu_count()
except AttributeError:
return psutil.NUM_CPUS
def _num_cpus():
'''
Compatibility wrapper for calculating the number of CPU's
a unit has.
@returns: int: number of CPU cores detected
'''
try:
return psutil.cpu_count()
except AttributeError:
return psutil.NUM_CPUS
def _num_cpus():
'''
Compatibility wrapper for calculating the number of CPU's
a unit has.
@returns: int: number of CPU cores detected
'''
try:
return psutil.cpu_count()
except AttributeError:
return psutil.NUM_CPUS
def _num_cpus():
'''
Compatibility wrapper for calculating the number of CPU's
a unit has.
@returns: int: number of CPU cores detected
'''
try:
return psutil.cpu_count()
except AttributeError:
return psutil.NUM_CPUS
def beta_phylogenetic_alt(table: BIOMV210Format, phylogeny: NewickFormat,
metric: str, n_jobs: int=1,
variance_adjusted: bool=False,
alpha: float=None,
bypass_tips: bool=False) -> skbio.DistanceMatrix:
metrics = phylogenetic_metrics_alt_dict()
generalized_unifrac = 'generalized_unifrac'
if metric not in metrics:
raise ValueError("Unknown metric: %s" % metric)
if alpha is not None and metric != generalized_unifrac:
raise ValueError('The alpha parameter is only allowed when the choice'
' of metric is generalized_unifrac')
# this behaviour is undefined, so let's avoid a seg fault
cpus = psutil.cpu_count(logical=False)
if n_jobs > cpus:
raise ValueError('The value of n_jobs cannot exceed the number of '
'processors (%d) available in this system.' % cpus)
if metric == generalized_unifrac:
alpha = 1.0 if alpha is None else alpha
f = partial(metrics[metric], alpha=alpha)
else:
f = metrics[metric]
# unifrac processes tables and trees should be filenames
return f(str(table), str(phylogeny), threads=n_jobs,
variance_adjusted=variance_adjusted, bypass_tips=bypass_tips)
def clean(self):
ram = self.cleaned_data.get('ram', None)
cores = self.cleaned_data.get('cores', None)
if cores:
cores = random_sample(range(0, cpu_count()), cores)
cores = ','.join(map(str, cores))
self.cleaned_data['cores'] = cores
else:
cores = cpu_count()
cores = str(list(range(0, cores))).strip('[]').replace(" ", "")
self.cleaned_data['cores'] = cores
if ram:
ram = int(ram)
self.cleaned_data['ram'] = ram
else:
ram = int(DHost.memory('total'))
self.cleaned_data['ram'] = ram
if self.ssh_users:
for ssh_user in self.ssh_users:
user_obj = User.objects.get(email=ssh_user)
if not user_obj.ssh_pub_key:
raise forms.ValidationError("SSH key not found")
return self.cleaned_data
def cpu(self, type):
if type == 'count':
return psutil.cpu_count()
elif type == 'cpu_percent':
return psutil.cpu_percent(interval=1)
def show_platform():
''' Show information on platform'''
swrite('\n=== SYSTEM ===\n\n')
try:
linux_distribution = platform.linux_distribution()
except:
linux_distribution = "N/A"
swrite("""
dist: %s
linux_distribution: %s
system: %s
machine: %s
platform: %s
uname: %s
version: %s
mac_ver: %s
memory: %s
number of CPU: %s
""" % (
str(platform.dist()),
linux_distribution,
platform.system(),
platform.machine(),
platform.platform(),
platform.uname(),
platform.version(),
platform.mac_ver(),
psutil.virtual_memory(),
str(psutil.cpu_count())
))
def get(self, request, show=False):
if show == 'all':
show = True
cpu_count = get_cpu_count()
return ('processes.html', {
'panel_pid': prism.settings.PANEL_PID,
'cpu_count': cpu_count[0],
'cpu_count_logical': cpu_count[1],
'ram': psutil.virtual_memory()[0],
'processes': self.get_processes(show,
lambda x: x['memory_percent'] + x['cpu_percent'])
})