def posix_error(filename):
errno = ctypes.get_errno()
exc = OSError(errno, strerror(errno))
exc.filename = filename
return exc
python类get_errno()的实例源码
def umount(mount_point):
"""This function unmounts a mounted directory forcibly. This will
be used for unmounting broken hard drive mounts which may hang.
If umount returns EBUSY this will lazy unmount.
:param mount_point: str. A String representing the filesystem mount point
:returns: int. Returns 0 on success. errno otherwise.
"""
libc_path = ctypes.util.find_library("c")
libc = ctypes.CDLL(libc_path, use_errno=True)
# First try to umount with MNT_FORCE
ret = libc.umount(mount_point, 1)
if ret < 0:
err = ctypes.get_errno()
if err == errno.EBUSY:
# Detach from try. IE lazy umount
ret = libc.umount(mount_point, 2)
if ret < 0:
err = ctypes.get_errno()
return err
return 0
else:
return err
return 0
def get_errno(self):
return ctypes.get_errno()
def get_errno(self):
return ctypes.get_errno()
def get_errno(self):
return ctypes.get_errno()
def sendfile(fdout, fdin, offset, nbytes):
if sys.platform == 'darwin':
_sendfile.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.c_uint64,
ctypes.POINTER(ctypes.c_uint64), ctypes.c_voidp,
ctypes.c_int]
_nbytes = ctypes.c_uint64(nbytes)
result = _sendfile(fdin, fdout, offset, _nbytes, None, 0)
if result == -1:
e = ctypes.get_errno()
if e == errno.EAGAIN and _nbytes.value is not None:
return _nbytes.value
raise OSError(e, os.strerror(e))
return _nbytes.value
elif sys.platform in ('freebsd', 'dragonfly',):
_sendfile.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.c_uint64,
ctypes.c_uint64, ctypes.c_voidp,
ctypes.POINTER(ctypes.c_uint64), ctypes.c_int]
_sbytes = ctypes.c_uint64()
result = _sendfile(fdin, fdout, offset, nbytes, None, _sbytes, 0)
if result == -1:
e = ctypes.get_errno()
if e == errno.EAGAIN and _sbytes.value is not None:
return _sbytes.value
raise OSError(e, os.strerror(e))
return _sbytes.value
else:
_sendfile.argtypes = [ctypes.c_int, ctypes.c_int,
ctypes.POINTER(ctypes.c_uint64), ctypes.c_size_t]
_offset = ctypes.c_uint64(offset)
sent = _sendfile(fdout, fdin, _offset, nbytes)
if sent == -1:
e = ctypes.get_errno()
raise OSError(e, os.strerror(e))
return sent
def get_errno(self):
return ctypes.get_errno()
def get_errno(self):
return ctypes.get_errno()
def get_errno(self):
"""
Return None is no errno code is available.
"""
return self._get_errno()
def str_errno(self):
code = self.get_errno()
if code is None:
return 'Errno: no errno support'
return 'Errno=%s (%s)' % (os.strerror(code), errno.errorcode[code])
def init(self):
assert ctypes
try_libc_name = 'c'
if sys.platform.startswith('freebsd'):
try_libc_name = 'inotify'
libc_name = None
try:
libc_name = ctypes.util.find_library(try_libc_name)
except (OSError, IOError):
pass # Will attemp to load it with None anyway.
self._libc = ctypes.CDLL(libc_name, use_errno=True)
self._get_errno_func = ctypes.get_errno
# Eventually check that libc has needed inotify bindings.
if (not hasattr(self._libc, 'inotify_init') or
not hasattr(self._libc, 'inotify_add_watch') or
not hasattr(self._libc, 'inotify_rm_watch')):
return False
self._libc.inotify_init.argtypes = []
self._libc.inotify_init.restype = ctypes.c_int
self._libc.inotify_add_watch.argtypes = [ctypes.c_int, ctypes.c_char_p,
ctypes.c_uint32]
self._libc.inotify_add_watch.restype = ctypes.c_int
self._libc.inotify_rm_watch.argtypes = [ctypes.c_int, ctypes.c_int]
self._libc.inotify_rm_watch.restype = ctypes.c_int
return True
def umount(mount_point):
"""This function unmounts a mounted directory forcibly. This will
be used for unmounting broken hard drive mounts which may hang.
If umount returns EBUSY this will lazy unmount.
:param mount_point: str. A String representing the filesystem mount point
:returns: int. Returns 0 on success. errno otherwise.
"""
libc_path = ctypes.util.find_library("c")
libc = ctypes.CDLL(libc_path, use_errno=True)
# First try to umount with MNT_FORCE
ret = libc.umount(mount_point, 1)
if ret < 0:
err = ctypes.get_errno()
if err == errno.EBUSY:
# Detach from try. IE lazy umount
ret = libc.umount(mount_point, 2)
if ret < 0:
err = ctypes.get_errno()
return err
return 0
else:
return err
return 0
def get_errno(self):
"""
Return None is no errno code is available.
"""
return self._get_errno()
def str_errno(self):
code = self.get_errno()
if code is None:
return 'Errno: no errno support'
return 'Errno=%s (%s)' % (os.strerror(code), errno.errorcode[code])
def init(self):
assert ctypes
try_libc_name = 'c'
if sys.platform.startswith('freebsd'):
try_libc_name = 'inotify'
libc_name = None
try:
libc_name = ctypes.util.find_library(try_libc_name)
except (OSError, IOError):
pass # Will attemp to load it with None anyway.
self._libc = ctypes.CDLL(libc_name, use_errno=True)
self._get_errno_func = ctypes.get_errno
# Eventually check that libc has needed inotify bindings.
if (not hasattr(self._libc, 'inotify_init') or
not hasattr(self._libc, 'inotify_add_watch') or
not hasattr(self._libc, 'inotify_rm_watch')):
return False
self._libc.inotify_init.argtypes = []
self._libc.inotify_init.restype = ctypes.c_int
self._libc.inotify_add_watch.argtypes = [ctypes.c_int, ctypes.c_char_p,
ctypes.c_uint32]
self._libc.inotify_add_watch.restype = ctypes.c_int
self._libc.inotify_rm_watch.argtypes = [ctypes.c_int, ctypes.c_int]
self._libc.inotify_rm_watch.restype = ctypes.c_int
return True
def _monotonic(): # noqa
t = timespec()
if clock_gettime(CLOCK_MONOTONIC, ctypes.pointer(t)) != 0:
errno_ = ctypes.get_errno()
raise OSError(errno_, os.strerror(errno_))
return t.tv_sec + t.tv_nsec * 1e-9
def die_if_parent_dies(signum=9):
if 'linux' not in sys.platform:
return
try:
import ctypes
libc = ctypes.CDLL('libc.so.6', use_errno=True)
PR_SET_PDEATHSIG = 1
result = libc.prctl(PR_SET_PDEATHSIG, signum)
if result == 0:
return True
else:
log('prctl failed: %s', os.strerror(ctypes.get_errno()))
except StandardError, ex:
sys.stderr.write(str(ex) + '\n')
def sendfile(fdout, fdin, offset, nbytes):
if sys.platform == 'darwin':
_sendfile.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.c_uint64,
ctypes.POINTER(ctypes.c_uint64), ctypes.c_voidp,
ctypes.c_int]
_nbytes = ctypes.c_uint64(nbytes)
result = _sendfile(fdin, fdout, offset, _nbytes, None, 0)
if result == -1:
e = ctypes.get_errno()
if e == errno.EAGAIN and _nbytes.value is not None:
return _nbytes.value
raise OSError(e, os.strerror(e))
return _nbytes.value
elif sys.platform in ('freebsd', 'dragonfly',):
_sendfile.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.c_uint64,
ctypes.c_uint64, ctypes.c_voidp,
ctypes.POINTER(ctypes.c_uint64), ctypes.c_int]
_sbytes = ctypes.c_uint64()
result = _sendfile(fdin, fdout, offset, nbytes, None, _sbytes, 0)
if result == -1:
e = ctypes.get_errno()
if e == errno.EAGAIN and _sbytes.value is not None:
return _sbytes.value
raise OSError(e, os.strerror(e))
return _sbytes.value
else:
_sendfile.argtypes = [ctypes.c_int, ctypes.c_int,
ctypes.POINTER(ctypes.c_uint64), ctypes.c_size_t]
_offset = ctypes.c_uint64(offset)
sent = _sendfile(fdout, fdin, _offset, nbytes)
if sent == -1:
e = ctypes.get_errno()
raise OSError(e, os.strerror(e))
return sent
def sendfile(fdout, fdin, offset, nbytes):
if sys.platform == 'darwin':
_sendfile.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.c_uint64,
ctypes.POINTER(ctypes.c_uint64), ctypes.c_voidp,
ctypes.c_int]
_nbytes = ctypes.c_uint64(nbytes)
result = _sendfile(fdin, fdout, offset, _nbytes, None, 0)
if result == -1:
e = ctypes.get_errno()
if e == errno.EAGAIN and _nbytes.value is not None:
return _nbytes.value
raise OSError(e, os.strerror(e))
return _nbytes.value
elif sys.platform in ('freebsd', 'dragonfly',):
_sendfile.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.c_uint64,
ctypes.c_uint64, ctypes.c_voidp,
ctypes.POINTER(ctypes.c_uint64), ctypes.c_int]
_sbytes = ctypes.c_uint64()
result = _sendfile(fdin, fdout, offset, nbytes, None, _sbytes, 0)
if result == -1:
e = ctypes.get_errno()
if e == errno.EAGAIN and _sbytes.value is not None:
return _sbytes.value
raise OSError(e, os.strerror(e))
return _sbytes.value
else:
_sendfile.argtypes = [ctypes.c_int, ctypes.c_int,
ctypes.POINTER(ctypes.c_uint64), ctypes.c_size_t]
_offset = ctypes.c_uint64(offset)
sent = _sendfile(fdout, fdin, _offset, nbytes)
if sent == -1:
e = ctypes.get_errno()
raise OSError(e, os.strerror(e))
return sent
def umount(mount_point):
"""This function unmounts a mounted directory forcibly. This will
be used for unmounting broken hard drive mounts which may hang.
If umount returns EBUSY this will lazy unmount.
:param mount_point: str. A String representing the filesystem mount point
:returns: int. Returns 0 on success. errno otherwise.
"""
libc_path = ctypes.util.find_library("c")
libc = ctypes.CDLL(libc_path, use_errno=True)
# First try to umount with MNT_FORCE
ret = libc.umount(mount_point, 1)
if ret < 0:
err = ctypes.get_errno()
if err == errno.EBUSY:
# Detach from try. IE lazy umount
ret = libc.umount(mount_point, 2)
if ret < 0:
err = ctypes.get_errno()
return err
return 0
else:
return err
return 0