def _is_daemon():
# The process group for a foreground process will match the
# process group of the controlling terminal. If those values do
# not match, or ioctl() fails on the stdout file handle, we assume
# the process is running in the background as a daemon.
# http://www.gnu.org/software/bash/manual/bashref.html#Job-Control-Basics
try:
is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno())
except OSError as err:
if err.errno == errno.ENOTTY:
# Assume we are a daemon because there is no terminal.
is_daemon = True
else:
raise
except UnsupportedOperation:
# Could not get the fileno for stdout, so we must be a daemon.
is_daemon = True
return is_daemon
python类ENOTTY的实例源码
def _is_daemon():
# The process group for a foreground process will match the
# process group of the controlling terminal. If those values do
# not match, or ioctl() fails on the stdout file handle, we assume
# the process is running in the background as a daemon.
# http://www.gnu.org/software/bash/manual/bashref.html#Job-Control-Basics
try:
is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno())
except io.UnsupportedOperation:
# Could not get the fileno for stdout, so we must be a daemon.
is_daemon = True
except OSError as err:
if err.errno == errno.ENOTTY:
# Assume we are a daemon because there is no terminal.
is_daemon = True
else:
raise
return is_daemon
def test_does_not_crash(self):
"""Check if get_terminal_size() returns a meaningful value.
There's no easy portable way to actually check the size of the
terminal, so let's check if it returns something sensible instead.
"""
try:
size = os.get_terminal_size()
except OSError as e:
if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
# Under win32 a generic OSError can be thrown if the
# handle cannot be retrieved
self.skipTest("failed to query terminal size")
raise
self.assertGreaterEqual(size.columns, 0)
self.assertGreaterEqual(size.lines, 0)
def test_stty_match(self):
"""Check if stty returns the same results
stty actually tests stdin, so get_terminal_size is invoked on
stdin explicitly. If stty succeeded, then get_terminal_size()
should work too.
"""
try:
size = subprocess.check_output(['stty', 'size']).decode().split()
except (FileNotFoundError, subprocess.CalledProcessError):
self.skipTest("stty invocation failed")
expected = (int(size[1]), int(size[0])) # reversed order
try:
actual = os.get_terminal_size(sys.__stdin__.fileno())
except OSError as e:
if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
# Under win32 a generic OSError can be thrown if the
# handle cannot be retrieved
self.skipTest("failed to query terminal size")
raise
self.assertEqual(expected, actual)
def test_does_not_crash(self):
"""Check if get_terminal_size() returns a meaningful value.
There's no easy portable way to actually check the size of the
terminal, so let's check if it returns something sensible instead.
"""
try:
size = os.get_terminal_size()
except OSError as e:
if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
# Under win32 a generic OSError can be thrown if the
# handle cannot be retrieved
self.skipTest("failed to query terminal size")
raise
self.assertGreaterEqual(size.columns, 0)
self.assertGreaterEqual(size.lines, 0)
def test_stty_match(self):
"""Check if stty returns the same results
stty actually tests stdin, so get_terminal_size is invoked on
stdin explicitly. If stty succeeded, then get_terminal_size()
should work too.
"""
try:
size = subprocess.check_output(['stty', 'size']).decode().split()
except (FileNotFoundError, subprocess.CalledProcessError):
self.skipTest("stty invocation failed")
expected = (int(size[1]), int(size[0])) # reversed order
try:
actual = os.get_terminal_size(sys.__stdin__.fileno())
except OSError as e:
if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
# Under win32 a generic OSError can be thrown if the
# handle cannot be retrieved
self.skipTest("failed to query terminal size")
raise
self.assertEqual(expected, actual)
def test_does_not_crash(self):
"""Check if get_terminal_size() returns a meaningful value.
There's no easy portable way to actually check the size of the
terminal, so let's check if it returns something sensible instead.
"""
try:
size = os.get_terminal_size()
except OSError as e:
if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
# Under win32 a generic OSError can be thrown if the
# handle cannot be retrieved
self.skipTest("failed to query terminal size")
raise
self.assertGreaterEqual(size.columns, 0)
self.assertGreaterEqual(size.lines, 0)
def test_stty_match(self):
"""Check if stty returns the same results
stty actually tests stdin, so get_terminal_size is invoked on
stdin explicitly. If stty succeeded, then get_terminal_size()
should work too.
"""
try:
size = subprocess.check_output(['stty', 'size']).decode().split()
except (FileNotFoundError, subprocess.CalledProcessError):
self.skipTest("stty invocation failed")
expected = (int(size[1]), int(size[0])) # reversed order
try:
actual = os.get_terminal_size(sys.__stdin__.fileno())
except OSError as e:
if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
# Under win32 a generic OSError can be thrown if the
# handle cannot be retrieved
self.skipTest("failed to query terminal size")
raise
self.assertEqual(expected, actual)
def setxattr(self, path, xattr, valbytes, flags, position=0):
# flags from linux/xattr.h: XATTR_CREATE = 1, XATTR_REPLACE = 2
if flags or position:
raise TmfsOSError(errno.ENOSYS) # haven't actually seen it yet
# 'Extend' user.xxxx syntax and screen for it here
elems = xattr.split('.')
if elems[0] != 'user' or len(elems) < 2:
raise TmfsOSError(errno.EINVAL)
# Don't forget the setfattr command, and the shell it runs in, does
# things to a "numeric" argument. setfattr processes a leading
# 0x and does a byte-by-byte conversion, yielding a byte array.
# It needs pairs of digits and can be of arbitrary length. Any
# other argument ends up here as a pure string (well, byte array).
try:
value = valbytes.decode()
except ValueError as e:
# http://stackoverflow.com/questions/606191/convert-bytes-to-a-python-string
value = valbytes.decode('cp437')
rsp = self.librarian(
self.lcp('set_xattr', path=path,
xattr=xattr, value=value))
if rsp is not None: # unexpected
raise TmfsOSError(errno.ENOTTY)
def removexattr(self, path, xattr):
rsp = self.librarian(
self.lcp('remove_xattr', path=path, xattr=xattr))
if rsp is not None: # unexpected
raise TmfsOSError(errno.ENOTTY)
def open(self):
"""\
Open port with current settings. This may throw a SerialException
if the port cannot be opened."""
if self._port is None:
raise SerialException("Port must be configured before it can be used.")
if self.is_open:
raise SerialException("Port is already open.")
self.fd = None
# open
try:
self.fd = os.open(self.portstr, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
except OSError as msg:
self.fd = None
raise SerialException(msg.errno, "could not open port {}: {}".format(self._port, msg))
#~ fcntl.fcntl(self.fd, fcntl.F_SETFL, 0) # set blocking
try:
self._reconfigure_port(force_update=True)
except:
try:
os.close(self.fd)
except:
# ignore any exception when closing the port
# also to keep original exception that happened when setting up
pass
self.fd = None
raise
else:
self.is_open = True
try:
if not self._dsrdtr:
self._update_dtr_state()
if not self._rtscts:
self._update_rts_state()
except IOError as e:
if e.errno in (errno.EINVAL, errno.ENOTTY):
# ignore Invalid argument and Inappropriate ioctl
pass
else:
raise
self.reset_input_buffer()
self.pipe_abort_read_r, self.pipe_abort_read_w = os.pipe()
self.pipe_abort_write_r, self.pipe_abort_write_w = os.pipe()
fcntl.fcntl(self.pipe_abort_read_r, fcntl.F_SETFL, os.O_NONBLOCK)
fcntl.fcntl(self.pipe_abort_write_r, fcntl.F_SETFL, os.O_NONBLOCK)
def open(self):
"""\
Open port with current settings. This may throw a SerialException
if the port cannot be opened."""
if self._port is None:
raise SerialException("Port must be configured before it can be used.")
if self.is_open:
raise SerialException("Port is already open.")
self.fd = None
# open
try:
self.fd = os.open(self.portstr, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
except OSError as msg:
self.fd = None
raise SerialException(msg.errno, "could not open port {}: {}".format(self._port, msg))
#~ fcntl.fcntl(self.fd, fcntl.F_SETFL, 0) # set blocking
try:
self._reconfigure_port(force_update=True)
except:
try:
os.close(self.fd)
except:
# ignore any exception when closing the port
# also to keep original exception that happened when setting up
pass
self.fd = None
raise
else:
self.is_open = True
try:
if not self._dsrdtr:
self._update_dtr_state()
if not self._rtscts:
self._update_rts_state()
except IOError as e:
if e.errno in (errno.EINVAL, errno.ENOTTY):
# ignore Invalid argument and Inappropriate ioctl
pass
else:
raise
self.reset_input_buffer()
self.pipe_abort_read_r, self.pipe_abort_read_w = os.pipe()
self.pipe_abort_write_r, self.pipe_abort_write_w = os.pipe()
fcntl.fcntl(self.pipe_abort_read_r, fcntl.F_SETFL, os.O_NONBLOCK)
fcntl.fcntl(self.pipe_abort_write_r, fcntl.F_SETFL, os.O_NONBLOCK)
def open(self):
"""\
Open port with current settings. This may throw a SerialException
if the port cannot be opened."""
if self._port is None:
raise SerialException("Port must be configured before it can be used.")
if self.is_open:
raise SerialException("Port is already open.")
self.fd = None
# open
try:
self.fd = os.open(self.portstr, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
except OSError as msg:
self.fd = None
raise SerialException(msg.errno, "could not open port {0}: {1}".format(self._port, msg))
#~ fcntl.fcntl(self.fd, fcntl.F_SETFL, 0) # set blocking
try:
self._reconfigure_port(force_update=True)
except:
try:
os.close(self.fd)
except:
# ignore any exception when closing the port
# also to keep original exception that happened when setting up
pass
self.fd = None
raise
else:
self.is_open = True
try:
if not self._dsrdtr:
self._update_dtr_state()
if not self._rtscts:
self._update_rts_state()
except IOError as e:
if e.errno in (errno.EINVAL, errno.ENOTTY):
# ignore Invalid argument and Inappropriate ioctl
pass
else:
raise
self.reset_input_buffer()
self.pipe_abort_read_r, self.pipe_abort_read_w = os.pipe()
self.pipe_abort_write_r, self.pipe_abort_write_w = os.pipe()
fcntl.fcntl(self.pipe_abort_read_r, fcntl.F_SETFL, os.O_NONBLOCK)
fcntl.fcntl(self.pipe_abort_write_r, fcntl.F_SETFL, os.O_NONBLOCK)
def open(self):
"""\
Open port with current settings. This may throw a SerialException
if the port cannot be opened."""
if self._port is None:
raise SerialException("Port must be configured before it can be used.")
if self.is_open:
raise SerialException("Port is already open.")
self.fd = None
# open
try:
self.fd = os.open(self.portstr, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
except OSError as msg:
self.fd = None
raise SerialException(msg.errno, "could not open port {}: {}".format(self._port, msg))
#~ fcntl.fcntl(self.fd, fcntl.F_SETFL, 0) # set blocking
try:
self._reconfigure_port(force_update=True)
except:
try:
os.close(self.fd)
except:
# ignore any exception when closing the port
# also to keep original exception that happened when setting up
pass
self.fd = None
raise
else:
self.is_open = True
try:
if not self._dsrdtr:
self._update_dtr_state()
if not self._rtscts:
self._update_rts_state()
except IOError as e:
if e.errno in (errno.EINVAL, errno.ENOTTY):
# ignore Invalid argument and Inappropriate ioctl
pass
else:
raise
self.reset_input_buffer()
self.pipe_abort_read_r, self.pipe_abort_read_w = os.pipe()
self.pipe_abort_write_r, self.pipe_abort_write_w = os.pipe()
fcntl.fcntl(self.pipe_abort_read_r, fcntl.F_SETFL, os.O_NONBLOCK)
fcntl.fcntl(self.pipe_abort_write_r, fcntl.F_SETFL, os.O_NONBLOCK)
def open(self):
"""\
Open port with current settings. This may throw a SerialException
if the port cannot be opened."""
if self._port is None:
raise SerialException("Port must be configured before it can be used.")
if self.is_open:
raise SerialException("Port is already open.")
self.fd = None
# open
try:
self.fd = os.open(self.portstr, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
except OSError as msg:
self.fd = None
raise SerialException(msg.errno, "could not open port {}: {}".format(self._port, msg))
#~ fcntl.fcntl(self.fd, fcntl.F_SETFL, 0) # set blocking
try:
self._reconfigure_port(force_update=True)
except:
try:
os.close(self.fd)
except:
# ignore any exception when closing the port
# also to keep original exception that happened when setting up
pass
self.fd = None
raise
else:
self.is_open = True
try:
if not self._dsrdtr:
self._update_dtr_state()
if not self._rtscts:
self._update_rts_state()
except IOError as e:
if e.errno in (errno.EINVAL, errno.ENOTTY):
# ignore Invalid argument and Inappropriate ioctl
pass
else:
raise
self.reset_input_buffer()
self.pipe_abort_read_r, self.pipe_abort_read_w = os.pipe()
self.pipe_abort_write_r, self.pipe_abort_write_w = os.pipe()
fcntl.fcntl(self.pipe_abort_read_r, fcntl.F_SETFL, os.O_NONBLOCK)
fcntl.fcntl(self.pipe_abort_write_r, fcntl.F_SETFL, os.O_NONBLOCK)