def safe_listdir(path):
"""
Attempt to list contents of path, but suppress some exceptions.
"""
try:
return os.listdir(path)
except (PermissionError, NotADirectoryError):
pass
except OSError as e:
# Ignore the directory if does not exist, not a directory or
# permission denied
ignorable = (
e.errno in (errno.ENOTDIR, errno.EACCES, errno.ENOENT)
# Python 2 on Windows needs to be handled this way :(
or getattr(e, "winerror", None) == 267
)
if not ignorable:
raise
return ()
python类EACCES的实例源码
def __init__(self, path, factory=None, create=True):
"""Initialize a single-file mailbox."""
Mailbox.__init__(self, path, factory, create)
try:
f = open(self._path, 'rb+')
except IOError, e:
if e.errno == errno.ENOENT:
if create:
f = open(self._path, 'wb+')
else:
raise NoSuchMailboxError(self._path)
elif e.errno in (errno.EACCES, errno.EROFS):
f = open(self._path, 'rb')
else:
raise
self._file = f
self._toc = None
self._next_key = 0
self._pending = False # No changes require rewriting the file.
self._locked = False
self._file_length = None # Used to record mailbox size
def __init__(self, debug=False, logfile=None):
logging.Logger.__init__(self, 'VirtualBMC')
try:
if logfile is not None:
self.handler = logging.FileHandler(logfile)
else:
self.handler = logging.StreamHandler()
formatter = logging.Formatter(DEFAULT_LOG_FORMAT)
self.handler.setFormatter(formatter)
self.addHandler(self.handler)
if debug:
self.setLevel(logging.DEBUG)
else:
self.setLevel(logging.INFO)
except IOError, e:
if e.errno == errno.EACCES:
pass
def safe_listdir(path):
"""
Attempt to list contents of path, but suppress some exceptions.
"""
try:
return os.listdir(path)
except (PermissionError, NotADirectoryError):
pass
except OSError as e:
# Ignore the directory if does not exist, not a directory or
# permission denied
ignorable = (
e.errno in (errno.ENOTDIR, errno.EACCES, errno.ENOENT)
# Python 2 on Windows needs to be handled this way :(
or getattr(e, "winerror", None) == 267
)
if not ignorable:
raise
return ()
def live(self):
if not self.proc_path:
return
self.info("Proc path: %s" % self.proc_path)
key = choice(self.proc_keys)
filename = path_join(self.proc_path, key)
data = self.generator.createValue()
self.info("Write data in %s: (len=%s) %r"
% (filename, len(data), data))
try:
output = open(filename, 'w')
output.write(data)
output.close()
except IOError, err:
if err.errno in (EINVAL, EPERM):
pass
elif err.errno in (ENOENT, EACCES):
self.error("Unable to write %s: %s" % (filename, err))
self.removeKey(key)
else:
raise
def readBytes(self, address, size):
debug("Read %s bytes at %s" % (size, formatAddress(address)))
if not self.read_mem_file:
filename = '/proc/%u/mem' % self.pid
try:
self.read_mem_file = open(filename, 'rb', 0)
except IOError, err:
message = "Unable to open %s: fallback to ptrace implementation" % filename
if err.errno != EACCES:
error(message)
else:
info(message)
self.readBytes = self._readBytes
return self.readBytes(address, size)
try:
mem = self.read_mem_file
mem.seek(address)
return mem.read(size)
except (IOError, ValueError), err:
raise ProcessError(self, "readBytes(%s, %s) error: %s" % (
formatAddress(address), size, err))
def get_free_port(port, port_retries):
for port in random_ports(port, port_retries+1):
try:
s = socket.socket()
s.bind(('', port))
port = s.getsockname()[1]
s.close()
return port
except socket.error as e:
if e.errno == errno.EADDRINUSE:
print('The port %i is already in use, trying another port.' % port)
continue
elif e.errno in (errno.EACCES, getattr(errno, 'WSAEACCES', errno.EACCES)):
print("Permission to listen on port %i denied" % port)
continue
else:
raise
return None
# http://stackoverflow.com/questions/13593223/making-sure-a-python-script-with-subprocesses-dies-on-sigint
def set_directory(self, directory):
"""Set the directory where the downloaded file will be placed.
May raise OSError if the supplied directory path is not suitable.
"""
if not path.exists(directory):
raise OSError(errno.ENOENT, "You see no directory there.",
directory)
if not path.isdir(directory):
raise OSError(errno.ENOTDIR, "You cannot put a file into "
"something which is not a directory.",
directory)
if not os.access(directory, os.X_OK | os.W_OK):
raise OSError(errno.EACCES,
"This directory is too hard to write in to.",
directory)
self.destDir = directory
def _testDetectFilesRemoved(self):
writeFileName = sibpath(plugins.__file__, 'pluginextra.py')
try:
wf = file(writeFileName, 'w')
except IOError, ioe:
if ioe.errno == errno.EACCES:
raise unittest.SkipTest(
"No permission to add things to twisted.plugins")
else:
raise
else:
try:
wf.write(begintest)
wf.close()
# Generate a cache with pluginextra in it.
list(plugin.getPlugins(plugin.ITestPlugin))
finally:
self._unimportPythonModule(
sys.modules['twisted.plugins.pluginextra'],
True)
plgs = list(plugin.getPlugins(plugin.ITestPlugin))
self.assertEquals(1, len(plgs))
def _mkstemp_inner(dir, pre, suf, flags):
"""Code common to mkstemp, TemporaryFile, and NamedTemporaryFile."""
names = _get_candidate_names()
for seq in xrange(TMP_MAX):
name = names.next()
file = _os.path.join(dir, pre + name + suf)
try:
fd = _os.open(file, flags, 0600)
_set_cloexec(fd)
return (fd, _os.path.abspath(file))
except OSError, e:
if e.errno == _errno.EEXIST:
continue # try again
if _os.name == 'nt' and e.errno == _errno.EACCES:
# On windows, when a directory with the chosen name already
# exists, EACCES error code is returned instead of EEXIST.
continue
raise
raise IOError, (_errno.EEXIST, "No usable temporary file name found")
# User visible interfaces.
def __init__(self, path, factory=None, create=True):
"""Initialize a single-file mailbox."""
Mailbox.__init__(self, path, factory, create)
try:
f = open(self._path, 'rb+')
except IOError, e:
if e.errno == errno.ENOENT:
if create:
f = open(self._path, 'wb+')
else:
raise NoSuchMailboxError(self._path)
elif e.errno in (errno.EACCES, errno.EROFS):
f = open(self._path, 'rb')
else:
raise
self._file = f
self._toc = None
self._next_key = 0
self._pending = False # No changes require rewriting the file.
self._pending_sync = False # No need to sync the file
self._locked = False
self._file_length = None # Used to record mailbox size
def acquire(self, blocking=False):
import fcntl # @UnresolvedImport
flags = os.O_CREAT | os.O_WRONLY
self.fd = os.open(self.filename, flags)
mode = fcntl.LOCK_EX
if not blocking:
mode |= fcntl.LOCK_NB
try:
fcntl.flock(self.fd, mode)
self.locked = True
return True
except IOError:
e = sys.exc_info()[1]
if e.errno not in (errno.EAGAIN, errno.EACCES):
raise
os.close(self.fd)
self.fd = None
return False
def acquire(self, blocking=False):
import msvcrt # @UnresolvedImport
flags = os.O_CREAT | os.O_WRONLY
mode = msvcrt.LK_NBLCK
if blocking:
mode = msvcrt.LK_LOCK
self.fd = os.open(self.filename, flags)
try:
msvcrt.locking(self.fd, mode, 1)
return True
except IOError:
e = sys.exc_info()[1]
if e.errno not in (errno.EAGAIN, errno.EACCES, errno.EDEADLK):
raise
os.close(self.fd)
self.fd = None
return False
def _try_acquire(self, blocking, watch):
try:
self.trylock()
except IOError as e:
if e.errno in (errno.EACCES, errno.EAGAIN):
if not blocking or watch.expired():
return False
else:
raise _utils.RetryAgain()
else:
raise threading.ThreadError("Unable to acquire lock on"
" `%(path)s` due to"
" %(exception)s" %
{
'path': self.path,
'exception': e,
})
else:
return True
def get_all_inodes(self):
inodes = {}
for pid in pids():
try:
inodes.update(self.get_proc_inodes(pid))
except OSError as err:
# os.listdir() is gonna raise a lot of access denied
# exceptions in case of unprivileged user; that's fine
# as we'll just end up returning a connection with PID
# and fd set to None anyway.
# Both netstat -an and lsof does the same so it's
# unlikely we can do any better.
# ENOENT just means a PID disappeared on us.
if err.errno not in (
errno.ENOENT, errno.ESRCH, errno.EPERM, errno.EACCES):
raise
return inodes
def wrap_exceptions(fun):
"""Decorator which translates bare OSError and IOError exceptions
into NoSuchProcess and AccessDenied.
"""
@functools.wraps(fun)
def wrapper(self, *args, **kwargs):
try:
return fun(self, *args, **kwargs)
except EnvironmentError as err:
# ENOENT (no such file or directory) gets raised on open().
# ESRCH (no such process) can get raised on read() if
# process is gone in meantime.
if err.errno in (errno.ENOENT, errno.ESRCH):
raise NoSuchProcess(self.pid, self._name)
if err.errno in (errno.EPERM, errno.EACCES):
raise AccessDenied(self.pid, self._name)
raise
return wrapper
def exe(self):
try:
return readlink("%s/%s/exe" % (self._procfs_path, self.pid))
except OSError as err:
if err.errno in (errno.ENOENT, errno.ESRCH):
# no such file error; might be raised also if the
# path actually exists for system processes with
# low pids (about 0-20)
if os.path.lexists("%s/%s" % (self._procfs_path, self.pid)):
return ""
else:
if not pid_exists(self.pid):
raise NoSuchProcess(self.pid, self._name)
else:
raise ZombieProcess(self.pid, self._name, self._ppid)
if err.errno in (errno.EPERM, errno.EACCES):
raise AccessDenied(self.pid, self._name)
raise
def wrap_exceptions(fun):
"""Decorator which translates bare OSError exceptions into
NoSuchProcess and AccessDenied.
"""
@functools.wraps(fun)
def wrapper(self, *args, **kwargs):
try:
return fun(self, *args, **kwargs)
except OSError as err:
if err.errno == errno.ESRCH:
if not pid_exists(self.pid):
raise NoSuchProcess(self.pid, self._name)
else:
raise ZombieProcess(self.pid, self._name, self._ppid)
if err.errno in (errno.EPERM, errno.EACCES):
raise AccessDenied(self.pid, self._name)
raise
return wrapper
def wrap_exceptions(fun):
"""Decorator which translates bare OSError exceptions into
NoSuchProcess and AccessDenied.
"""
@functools.wraps(fun)
def wrapper(self, *args, **kwargs):
try:
return fun(self, *args, **kwargs)
except OSError as err:
if err.errno == errno.ESRCH:
if not pid_exists(self.pid):
raise NoSuchProcess(self.pid, self._name)
else:
raise ZombieProcess(self.pid, self._name, self._ppid)
if err.errno in (errno.EPERM, errno.EACCES):
raise AccessDenied(self.pid, self._name)
raise
return wrapper
def _send_signal(self, sig):
if self.pid == 0:
# see "man 2 kill"
raise ValueError(
"preventing sending signal to process with PID 0 as it "
"would affect every process in the process group of the "
"calling process (os.getpid()) instead of PID 0")
try:
os.kill(self.pid, sig)
except OSError as err:
if err.errno == errno.ESRCH:
if OPENBSD and pid_exists(self.pid):
# We do this because os.kill() lies in case of
# zombie processes.
raise ZombieProcess(self.pid, self._name, self._ppid)
else:
self._gone = True
raise NoSuchProcess(self.pid, self._name)
if err.errno in (errno.EPERM, errno.EACCES):
raise AccessDenied(self.pid, self._name)
raise
def wrap_exceptions(fun):
"""Call callable into a try/except clause and translate ENOENT,
EACCES and EPERM in NoSuchProcess or AccessDenied exceptions.
"""
def wrapper(self, *args, **kwargs):
try:
return fun(self, *args, **kwargs)
except EnvironmentError as err:
# ENOENT (no such file or directory) gets raised on open().
# ESRCH (no such process) can get raised on read() if
# process is gone in meantime.
if err.errno in (errno.ENOENT, errno.ESRCH):
if not pid_exists(self.pid):
raise NoSuchProcess(self.pid, self._name)
else:
raise ZombieProcess(self.pid, self._name, self._ppid)
if err.errno in (errno.EPERM, errno.EACCES):
raise AccessDenied(self.pid, self._name)
raise
return wrapper
def get_all_inodes(self):
inodes = {}
for pid in pids():
try:
inodes.update(self.get_proc_inodes(pid))
except OSError as err:
# os.listdir() is gonna raise a lot of access denied
# exceptions in case of unprivileged user; that's fine
# as we'll just end up returning a connection with PID
# and fd set to None anyway.
# Both netstat -an and lsof does the same so it's
# unlikely we can do any better.
# ENOENT just means a PID disappeared on us.
if err.errno not in (
errno.ENOENT, errno.ESRCH, errno.EPERM, errno.EACCES):
raise
return inodes
def exe(self):
try:
return readlink("%s/%s/exe" % (self._procfs_path, self.pid))
except OSError as err:
if err.errno in (errno.ENOENT, errno.ESRCH):
# no such file error; might be raised also if the
# path actually exists for system processes with
# low pids (about 0-20)
if os.path.lexists("%s/%s" % (self._procfs_path, self.pid)):
return ""
else:
if not pid_exists(self.pid):
raise NoSuchProcess(self.pid, self._name)
else:
raise ZombieProcess(self.pid, self._name, self._ppid)
if err.errno in (errno.EPERM, errno.EACCES):
raise AccessDenied(self.pid, self._name)
raise
def wrap_exceptions(fun):
"""Decorator which translates bare OSError exceptions into
NoSuchProcess and AccessDenied.
"""
@functools.wraps(fun)
def wrapper(self, *args, **kwargs):
try:
return fun(self, *args, **kwargs)
except OSError as err:
if err.errno == errno.ESRCH:
if not pid_exists(self.pid):
raise NoSuchProcess(self.pid, self._name)
else:
raise ZombieProcess(self.pid, self._name, self._ppid)
if err.errno in (errno.EPERM, errno.EACCES):
raise AccessDenied(self.pid, self._name)
raise
return wrapper
def _send_signal(self, sig):
if self.pid == 0:
# see "man 2 kill"
raise ValueError(
"preventing sending signal to process with PID 0 as it "
"would affect every process in the process group of the "
"calling process (os.getpid()) instead of PID 0")
try:
os.kill(self.pid, sig)
except OSError as err:
if err.errno == errno.ESRCH:
if OPENBSD and pid_exists(self.pid):
# We do this because os.kill() lies in case of
# zombie processes.
raise ZombieProcess(self.pid, self._name, self._ppid)
else:
self._gone = True
raise NoSuchProcess(self.pid, self._name)
if err.errno in (errno.EPERM, errno.EACCES):
raise AccessDenied(self.pid, self._name)
raise
def wrap_exceptions(fun):
"""Call callable into a try/except clause and translate ENOENT,
EACCES and EPERM in NoSuchProcess or AccessDenied exceptions.
"""
def wrapper(self, *args, **kwargs):
try:
return fun(self, *args, **kwargs)
except EnvironmentError as err:
# ENOENT (no such file or directory) gets raised on open().
# ESRCH (no such process) can get raised on read() if
# process is gone in meantime.
if err.errno in (errno.ENOENT, errno.ESRCH):
if not pid_exists(self.pid):
raise NoSuchProcess(self.pid, self._name)
else:
raise ZombieProcess(self.pid, self._name, self._ppid)
if err.errno in (errno.EPERM, errno.EACCES):
raise AccessDenied(self.pid, self._name)
raise
return wrapper
def __init__(self, debug=False, logfile=None):
logging.Logger.__init__(self, 'VirtualBMC')
try:
if logfile is not None:
self.handler = logging.FileHandler(logfile)
else:
self.handler = logging.StreamHandler()
formatter = logging.Formatter(DEFAULT_LOG_FORMAT)
self.handler.setFormatter(formatter)
self.addHandler(self.handler)
if debug:
self.setLevel(logging.DEBUG)
else:
self.setLevel(logging.INFO)
except IOError as e:
if e.errno == errno.EACCES:
pass
def convert_errno(e):
"""
Convert an errno value (as from an C{OSError} or C{IOError}) into a
standard SFTP result code. This is a convenience function for trapping
exceptions in server code and returning an appropriate result.
@param e: an errno code, as from C{OSError.errno}.
@type e: int
@return: an SFTP error code like L{SFTP_NO_SUCH_FILE}.
@rtype: int
"""
if e == errno.EACCES:
# permission denied
return SFTP_PERMISSION_DENIED
elif (e == errno.ENOENT) or (e == errno.ENOTDIR):
# no such file
return SFTP_NO_SUCH_FILE
else:
return SFTP_FAILURE
def _convert_status(self, msg):
"""
Raises EOFError or IOError on error status; otherwise does nothing.
"""
code = msg.get_int()
text = msg.get_string()
if code == SFTP_OK:
return
elif code == SFTP_EOF:
raise EOFError(text)
elif code == SFTP_NO_SUCH_FILE:
# clever idea from john a. meinel: map the error codes to errno
raise IOError(errno.ENOENT, text)
elif code == SFTP_PERMISSION_DENIED:
raise IOError(errno.EACCES, text)
else:
raise IOError(text)
def _try_acquire(self, blocking, watch):
try:
self.trylock()
except IOError as e:
if e.errno in (errno.EACCES, errno.EAGAIN):
if not blocking or watch.expired():
return False
else:
raise _utils.RetryAgain()
else:
raise threading.ThreadError("Unable to acquire lock on"
" `%(path)s` due to"
" %(exception)s" %
{
'path': self.path,
'exception': e,
})
else:
return True