def _try_except_permissionerror_iter(try_iter, except_iter):
if sys.version_info >= (3, 3):
try:
for x in try_iter():
yield x
except PermissionError as exc:
for x in except_iter(exc):
yield x
else:
try:
for x in try_iter():
yield x
except EnvironmentError as exc:
if exc.errno not in (EPERM, EACCES):
raise
else:
for x in except_iter(exc):
yield x
python类EACCES的实例源码
def test_bind_by_addr(self, llc, raw, ldl, dlc):
llc.bind(raw, 16)
assert llc.getsockname(raw) == 16
for i, sock in enumerate([ldl, dlc]):
with pytest.raises(nfc.llcp.Error) as excinfo:
llc.bind(sock, 16)
assert excinfo.value.errno == errno.EACCES
llc.bind(ldl, 63)
assert llc.getsockname(ldl) == 63
with pytest.raises(nfc.llcp.Error) as excinfo:
llc.bind(dlc, 63)
assert excinfo.value.errno == errno.EADDRINUSE
with pytest.raises(nfc.llcp.Error) as excinfo:
llc.bind(dlc, 64)
assert excinfo.value.errno == errno.EFAULT
with pytest.raises(nfc.llcp.Error) as excinfo:
llc.bind(dlc, -1)
assert excinfo.value.errno == errno.EFAULT
llc.bind(dlc, 62)
assert llc.getsockname(dlc) == 62
def test_selector_error(self):
s = self.make_selector()
rd, wr = self.make_socketpair()
s.register(rd, selectors2.EVENT_READ)
def alarm_exception(*args):
err = OSError()
err.errno = errno.EACCES
raise err
self.set_alarm(SHORT_SELECT, alarm_exception)
try:
s.select(LONG_SELECT)
except OSError as e:
self.assertEqual(e.errno, errno.EACCES)
except Exception as e:
self.fail("Raised incorrect exception: " + str(e))
else:
self.fail("select() didn't raise OSError")
# Test ensures that _syscall_wrapper properly raises the
# exception that is raised from an interrupt handler.
def send(self, req, **kwargs):
resp = requests.Response()
try:
resp.raw = urlopen(req.url)
content_length = resp.raw.headers.get('content-length')
if content_length is not None:
resp.headers['Content-Length'] = content_length
except IOError as e:
if e.errno == errno.EACCES:
resp.status_code = requests.codes.forbidden
elif e.errno == errno.ENOENT:
resp.status_code = requests.codes.not_found
else:
resp.status_code = requests.codes.bad_request
except OSError:
resp.status_code = requests.codes.bad_request
else:
resp.status_code = requests.codes.ok
return resp
def get_all_inodes(self):
inodes = {}
for pid in pids():
try:
inodes.update(self.get_proc_inodes(pid))
except OSError as err:
# os.listdir() is gonna raise a lot of access denied
# exceptions in case of unprivileged user; that's fine
# as we'll just end up returning a connection with PID
# and fd set to None anyway.
# Both netstat -an and lsof does the same so it's
# unlikely we can do any better.
# ENOENT just means a PID disappeared on us.
if err.errno not in (
errno.ENOENT, errno.ESRCH, errno.EPERM, errno.EACCES):
raise
return inodes
def wrap_exceptions(fun):
"""Decorator which translates bare OSError and IOError exceptions
into NoSuchProcess and AccessDenied.
"""
@functools.wraps(fun)
def wrapper(self, *args, **kwargs):
try:
return fun(self, *args, **kwargs)
except EnvironmentError as err:
# ENOENT (no such file or directory) gets raised on open().
# ESRCH (no such process) can get raised on read() if
# process is gone in meantime.
if err.errno in (errno.ENOENT, errno.ESRCH):
raise NoSuchProcess(self.pid, self._name)
if err.errno in (errno.EPERM, errno.EACCES):
raise AccessDenied(self.pid, self._name)
raise
return wrapper
def exe(self):
try:
return readlink("%s/%s/exe" % (self._procfs_path, self.pid))
except OSError as err:
if err.errno in (errno.ENOENT, errno.ESRCH):
# no such file error; might be raised also if the
# path actually exists for system processes with
# low pids (about 0-20)
if os.path.lexists("%s/%s" % (self._procfs_path, self.pid)):
return ""
else:
if not pid_exists(self.pid):
raise NoSuchProcess(self.pid, self._name)
else:
raise ZombieProcess(self.pid, self._name, self._ppid)
if err.errno in (errno.EPERM, errno.EACCES):
raise AccessDenied(self.pid, self._name)
raise
def wrap_exceptions(fun):
"""Decorator which translates bare OSError exceptions into
NoSuchProcess and AccessDenied.
"""
@functools.wraps(fun)
def wrapper(self, *args, **kwargs):
try:
return fun(self, *args, **kwargs)
except OSError as err:
if err.errno == errno.ESRCH:
if not pid_exists(self.pid):
raise NoSuchProcess(self.pid, self._name)
else:
raise ZombieProcess(self.pid, self._name, self._ppid)
if err.errno in (errno.EPERM, errno.EACCES):
raise AccessDenied(self.pid, self._name)
raise
return wrapper
def wrap_exceptions(fun):
"""Decorator which translates bare OSError exceptions into
NoSuchProcess and AccessDenied.
"""
@functools.wraps(fun)
def wrapper(self, *args, **kwargs):
try:
return fun(self, *args, **kwargs)
except OSError as err:
if err.errno == errno.ESRCH:
if not pid_exists(self.pid):
raise NoSuchProcess(self.pid, self._name)
else:
raise ZombieProcess(self.pid, self._name, self._ppid)
if err.errno in (errno.EPERM, errno.EACCES):
raise AccessDenied(self.pid, self._name)
raise
return wrapper
def _send_signal(self, sig):
if self.pid == 0:
# see "man 2 kill"
raise ValueError(
"preventing sending signal to process with PID 0 as it "
"would affect every process in the process group of the "
"calling process (os.getpid()) instead of PID 0")
try:
os.kill(self.pid, sig)
except OSError as err:
if err.errno == errno.ESRCH:
if OPENBSD and pid_exists(self.pid):
# We do this because os.kill() lies in case of
# zombie processes.
raise ZombieProcess(self.pid, self._name, self._ppid)
else:
self._gone = True
raise NoSuchProcess(self.pid, self._name)
if err.errno in (errno.EPERM, errno.EACCES):
raise AccessDenied(self.pid, self._name)
raise
def wrap_exceptions(fun):
"""Call callable into a try/except clause and translate ENOENT,
EACCES and EPERM in NoSuchProcess or AccessDenied exceptions.
"""
def wrapper(self, *args, **kwargs):
try:
return fun(self, *args, **kwargs)
except EnvironmentError as err:
# ENOENT (no such file or directory) gets raised on open().
# ESRCH (no such process) can get raised on read() if
# process is gone in meantime.
if err.errno in (errno.ENOENT, errno.ESRCH):
if not pid_exists(self.pid):
raise NoSuchProcess(self.pid, self._name)
else:
raise ZombieProcess(self.pid, self._name, self._ppid)
if err.errno in (errno.EPERM, errno.EACCES):
raise AccessDenied(self.pid, self._name)
raise
return wrapper
def get_all_inodes(self):
inodes = {}
for pid in pids():
try:
inodes.update(self.get_proc_inodes(pid))
except OSError as err:
# os.listdir() is gonna raise a lot of access denied
# exceptions in case of unprivileged user; that's fine
# as we'll just end up returning a connection with PID
# and fd set to None anyway.
# Both netstat -an and lsof does the same so it's
# unlikely we can do any better.
# ENOENT just means a PID disappeared on us.
if err.errno not in (
errno.ENOENT, errno.ESRCH, errno.EPERM, errno.EACCES):
raise
return inodes
def exe(self):
try:
return readlink("%s/%s/exe" % (self._procfs_path, self.pid))
except OSError as err:
if err.errno in (errno.ENOENT, errno.ESRCH):
# no such file error; might be raised also if the
# path actually exists for system processes with
# low pids (about 0-20)
if os.path.lexists("%s/%s" % (self._procfs_path, self.pid)):
return ""
else:
if not pid_exists(self.pid):
raise NoSuchProcess(self.pid, self._name)
else:
raise ZombieProcess(self.pid, self._name, self._ppid)
if err.errno in (errno.EPERM, errno.EACCES):
raise AccessDenied(self.pid, self._name)
raise
def wrap_exceptions(fun):
"""Decorator which translates bare OSError exceptions into
NoSuchProcess and AccessDenied.
"""
@functools.wraps(fun)
def wrapper(self, *args, **kwargs):
try:
return fun(self, *args, **kwargs)
except OSError as err:
if err.errno == errno.ESRCH:
if not pid_exists(self.pid):
raise NoSuchProcess(self.pid, self._name)
else:
raise ZombieProcess(self.pid, self._name, self._ppid)
if err.errno in (errno.EPERM, errno.EACCES):
raise AccessDenied(self.pid, self._name)
raise
return wrapper
def _send_signal(self, sig):
if self.pid == 0:
# see "man 2 kill"
raise ValueError(
"preventing sending signal to process with PID 0 as it "
"would affect every process in the process group of the "
"calling process (os.getpid()) instead of PID 0")
try:
os.kill(self.pid, sig)
except OSError as err:
if err.errno == errno.ESRCH:
if OPENBSD and pid_exists(self.pid):
# We do this because os.kill() lies in case of
# zombie processes.
raise ZombieProcess(self.pid, self._name, self._ppid)
else:
self._gone = True
raise NoSuchProcess(self.pid, self._name)
if err.errno in (errno.EPERM, errno.EACCES):
raise AccessDenied(self.pid, self._name)
raise
def wrap_exceptions(fun):
"""Call callable into a try/except clause and translate ENOENT,
EACCES and EPERM in NoSuchProcess or AccessDenied exceptions.
"""
def wrapper(self, *args, **kwargs):
try:
return fun(self, *args, **kwargs)
except EnvironmentError as err:
# ENOENT (no such file or directory) gets raised on open().
# ESRCH (no such process) can get raised on read() if
# process is gone in meantime.
if err.errno in (errno.ENOENT, errno.ESRCH):
if not pid_exists(self.pid):
raise NoSuchProcess(self.pid, self._name)
else:
raise ZombieProcess(self.pid, self._name, self._ppid)
if err.errno in (errno.EPERM, errno.EACCES):
raise AccessDenied(self.pid, self._name)
raise
return wrapper
def test_non_retryable_exception_handling(self):
"""
Tests a resumable upload that fails with a non-retryable exception
"""
harness = CallbackTestHarness(
exception=OSError(errno.EACCES, 'Permission denied'))
res_upload_handler = ResumableUploadHandler(num_retries=1)
small_src_file_as_string, small_src_file = self.make_small_file()
small_src_file.seek(0)
dst_key = self._MakeKey(set_contents=False)
try:
dst_key.set_contents_from_file(
small_src_file, cb=harness.call,
res_upload_handler=res_upload_handler)
self.fail('Did not get expected OSError')
except OSError, e:
# Ensure the error was re-raised.
self.assertEqual(e.errno, 13)
def test_non_retryable_exception_handling(self):
"""
Tests resumable download that fails with a non-retryable exception
"""
harness = CallbackTestHarness(
exception=OSError(errno.EACCES, 'Permission denied'))
res_download_handler = ResumableDownloadHandler(num_retries=1)
dst_fp = self.make_dst_fp()
small_src_key_as_string, small_src_key = self.make_small_key()
try:
small_src_key.get_contents_to_file(
dst_fp, cb=harness.call,
res_download_handler=res_download_handler)
self.fail('Did not get expected OSError')
except OSError, e:
# Ensure the error was re-raised.
self.assertEqual(e.errno, 13)
def test_get_ioerror_slow_path(cache):
cache.reset('eviction_policy', 'least-recently-used')
cache.set(0, 0)
disk = mock.Mock()
put = mock.Mock()
fetch = mock.Mock()
disk.put = put
put.side_effect = [(0, True)]
disk.fetch = fetch
io_error = IOError()
io_error.errno = errno.EACCES
fetch.side_effect = io_error
with mock.patch.object(cache, '_disk', disk):
cache.get(0)
def test_pop_ioerror_eacces(cache):
assert cache.set(0, 0)
disk = mock.Mock()
put = mock.Mock()
fetch = mock.Mock()
disk.put = put
put.side_effect = [(0, True)]
disk.fetch = fetch
io_error = IOError()
io_error.errno = errno.EACCES
fetch.side_effect = io_error
with mock.patch.object(cache, '_disk', disk):
cache.pop(0)