def test_threads_mocked(self):
# Test the case where os.listdir() returns a file (thread)
# which no longer exists by the time we open() it (race
# condition). threads() is supposed to ignore that instead
# of raising NSP.
def open_mock(name, *args, **kwargs):
if name.startswith('/proc/%s/task' % os.getpid()):
raise IOError(errno.ENOENT, "")
else:
return orig_open(name, *args, **kwargs)
orig_open = open
patch_point = 'builtins.open' if PY3 else '__builtin__.open'
with mock.patch(patch_point, side_effect=open_mock) as m:
ret = psutil.Process().threads()
assert m.called
self.assertEqual(ret, [])
# ...but if it bumps into something != ENOENT we want an
# exception.
def open_mock(name, *args, **kwargs):
if name.startswith('/proc/%s/task' % os.getpid()):
raise IOError(errno.EPERM, "")
else:
return orig_open(name, *args, **kwargs)
with mock.patch(patch_point, side_effect=open_mock):
self.assertRaises(psutil.AccessDenied, psutil.Process().threads)
# not sure why (doesn't fail locally)
# https://travis-ci.org/giampaolo/psutil/jobs/108629915
评论列表
文章目录