def test_process_create_time(self):
cmdline = "ps -o lstart -p %s" % self.pid
p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE)
output = p.communicate()[0]
if PY3:
output = str(output, sys.stdout.encoding)
start_ps = output.replace('STARTED', '').strip()
hhmmss = start_ps.split(' ')[-2]
year = start_ps.split(' ')[-1]
start_psutil = psutil.Process(self.pid).create_time()
self.assertEqual(
hhmmss,
time.strftime("%H:%M:%S", time.localtime(start_psutil)))
self.assertEqual(
year,
time.strftime("%Y", time.localtime(start_psutil)))
python类Process()的实例源码
def test_serialization(self):
def check(ret):
if json is not None:
json.loads(json.dumps(ret))
a = pickle.dumps(ret)
b = pickle.loads(a)
self.assertEqual(ret, b)
check(psutil.Process().as_dict())
check(psutil.virtual_memory())
check(psutil.swap_memory())
check(psutil.cpu_times())
check(psutil.cpu_times_percent(interval=0))
check(psutil.net_io_counters())
if LINUX and not os.path.exists('/proc/diskstats'):
pass
else:
if not APPVEYOR:
check(psutil.disk_io_counters())
check(psutil.disk_partitions())
check(psutil.disk_usage(os.getcwd()))
check(psutil.users())
def test_ad_on_process_creation(self):
# We are supposed to be able to instantiate Process also in case
# of zombie processes or access denied.
with mock.patch.object(psutil.Process, 'create_time',
side_effect=psutil.AccessDenied) as meth:
psutil.Process()
assert meth.called
with mock.patch.object(psutil.Process, 'create_time',
side_effect=psutil.ZombieProcess(1)) as meth:
psutil.Process()
assert meth.called
with mock.patch.object(psutil.Process, 'create_time',
side_effect=ValueError) as meth:
with self.assertRaises(ValueError):
psutil.Process()
assert meth.called
def test_special_pid(self):
p = psutil.Process(4)
self.assertEqual(p.name(), 'System')
# use __str__ to access all common Process properties to check
# that nothing strange happens
str(p)
p.username()
self.assertTrue(p.create_time() >= 0.0)
try:
rss, vms = p.memory_info()[:2]
except psutil.AccessDenied:
# expected on Windows Vista and Windows 7
if not platform.uname()[1] in ('vista', 'win-7', 'win7'):
raise
else:
self.assertTrue(rss > 0)
test_freeze_update_available.py 文件源码
项目:pyupdater-wx-demo
作者: wettenhj
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def PidIsRunning(pid):
"""
Check if a process with PID pid is running.
"""
try:
proc = psutil.Process(int(pid))
if proc.status == psutil.STATUS_DEAD:
return False
if proc.status == psutil.STATUS_ZOMBIE:
return False
return True # Assume other status are valid
except psutil.NoSuchProcess:
return False
def process_errored(self):
try:
exit_code = self.process.exitCode()
except Exception:
exit_code = 0
format = self.ui.edit_stdout.currentCharFormat()
format.setFontWeight(QFont.Bold)
format.setForeground(Qt.red)
self.ui.edit_stdout.setCurrentCharFormat(format)
self.ui.edit_stdout.appendPlainText('Process exited with exit code' % exit_code)
self.restore_gui()
self.ui.edit_stdout.setTextInteractionFlags(Qt.TextSelectableByMouse |
Qt.TextSelectableByKeyboard)
self.process = None
def check_processes(self):
try:
self.logger.log("Main", "informative", "Checking processes...")
for process in self.processes:
proc = psutil.Process(process.pid)
if not proc.is_running():
self.logger.log("Main", "error", "Process crashed! Exiting program.")
self.stop() # We can't trust the program after a process crashes.
self.logger.log("Main", "informative", "Processes OK!")
except Exception as e:
self.logger.log("Main", "error", "Error checking processes: " + str(e))
self.stop() # Stop here sinc a process probably died, causing this error
self.gui.after(100, self.check_processes) # Check again in 100ms
# Starts the processes
def get_rss_prop(): # this is quite expensive
process = psutil.Process(os.getpid())
return (process.memory_info().rss - process.memory_info().shared) / 10**6
def __init__(self, description, debug=False, **parameters):
self.debug = debug
if debug:
try:
import os
import psutil
import gc
self.description = description
self.params = parameters
self.start_memory = None
self.process = psutil.Process(os.getpid())
except Exception as e:
Log.warning("problem in memory measure", cause=e)
def memory_usage_psutil():
# return the memory usage in MB
import psutil
process = psutil.Process(os.getpid())
mem = process.get_memory_info()[0] / float(2 ** 20)
return mem
def getProcess():
psutil = getPSUtil()
if psutil:
import os
return psutil.Process( os.getpid() )
return None
def printMemUsage():
psutil = getPSUtil()
if psutil:
u = mem_usage()
M = psutil.virtual_memory()
print( "Process Mem: %0.1fMb System: %0.1fMb / %0.1fMb"%(u, M.used/1048576., M.total/1048576.) )
def ownhandle() :
my_regex = r".*" + re.escape(sys.argv[1]) + r".*"
regex = re.compile(my_regex, re.IGNORECASE)
for proc in psutil.process_iter():
try:
pinfo = proc.as_dict(attrs=['pid'])
except psutil.NoSuchProcess:
pass
else:
#print pinfo['pid']
try:
proci = psutil.Process(pinfo['pid'])
for files in proci.open_files() :
#print files
#handles = re.match(my_regex, files, re.IGNORECASE)
match = regex.search(str(files))
#print match
if match is not None and pinfo['pid'] not in safepids :
return(pinfo['pid'])
except :
pass
def stop(self):
StopCheck = True
NumProcs = len(self._thread_list)
ProcsStopped = 0
while StopCheck:
for _idx, _t in enumerate(self._thread_list):
_qmsg = _t.finq
if _qmsg == 'completed_run':
_t.stop()
ProcsStopped += 1
logging.info("Stopped thread %s with pid %s with kwargs \
%s" % (_idx+1, _t.getPID(), _t))
if _qmsg == 'error':
_t.stop()
logging.info("Error on thread %s with pid %s with kwargs \
%s" % (_idx+1, _t.getPID(), _t))
StopCheck = False
if NumProcs == ProcsStopped:
StopCheck = False
sleep(0.5)
print "\n\nGoodbye..."
for pid in self._pid_list:
p = psutil.Process(pid)
p.terminate()
exit(0)
def benchmark(self):
'''Benchmark'''
process = psutil.Process()
memory = process.memory_info().rss / 2 ** 20
process.cpu_percent()
embed = discord.Embed(color = clients.bot_color)
embed.add_field(name = "RAM", value = "{:.2f} MiB".format(memory))
embed.add_field(name = "CPU", value = "Calculating CPU usage..")
message, embed = await self.bot.say(embed = embed)
await asyncio.sleep(1)
cpu = process.cpu_percent() / psutil.cpu_count()
embed.set_field_at(1, name = "CPU", value = "{}%".format(cpu))
await self.bot.edit_message(message, embed = embed)
def kill_process(pid):
proc = psutil.Process(pid)
proc.kill()
def pid_isrunning(pid):
proc = psutil.Process(pid)
return proc
def __init__(self):
self.process = psutil.Process()
super().__init__(name='memory_reporter', daemon=True)
def close_fds(self, exclude):
exclude.append(self.pipe.w)
Process.close_fds(self, exclude)
def t5(pretty, factory):
pipe = Pipe()
p = P2(pipe)
p.start()
factory.processes.append(p)
gc_pid = pipe.get()
try:
proc = psutil.Process(gc_pid)
except Exception, e:
print('FAIL %s: could not query grandchild: %s' % (pretty, e))
return False
os.kill(p.pid, signal.SIGKILL)
ok = False
for i in range(3):
try:
proc = psutil.Process(gc_pid)
except psutil.NoSuchProcess:
ok = True
break
time.sleep(0.5)
if not ok:
print('FAIL %s: could query grandchild: %s' % (pretty, proc))
return False
return True
# grandchild does not die when child dies if PDEATHSIG is unset?